You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/12 18:03:01 UTC
[01/16] incubator-airflow git commit: [AIRFLOW-1517] Kubernetes
Operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master b48bbbd6f -> 1abe7f6d5
[AIRFLOW-1517] Kubernetes Operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/78ff2fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/78ff2fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/78ff2fc1
Branch: refs/heads/master
Commit: 78ff2fc180808a38c53cc643bd87c509d7540b4a
Parents: c0dffb5
Author: Daniel Imberman <da...@gmail.com>
Authored: Thu Dec 7 09:41:05 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Tue Dec 26 08:45:31 2017 -0800
----------------------------------------------------------------------
.travis.yml | 5 +-
airflow/contrib/kubernetes/__init__.py | 13 +
airflow/contrib/kubernetes/kube_client.py | 34 +++
.../kubernetes_request_factory/__init__.py | 12 +
.../kubernetes_request_factory.py | 162 +++++++++++
.../pod_request_factory.py | 56 ++++
airflow/contrib/kubernetes/pod.py | 92 ++++++
airflow/contrib/kubernetes/pod_generator.py | 278 +++++++++++++++++++
airflow/contrib/kubernetes/pod_launcher.py | 119 ++++++++
airflow/contrib/kubernetes/secret.py | 36 +++
.../operators/kubernetes_pod_operator.py | 71 +++++
.../ci/kubernetes/minikube/start_minikube.sh | 53 ++++
scripts/ci/kubernetes/setup_kubernetes.sh | 28 ++
scripts/ci/requirements.txt | 1 +
scripts/ci/travis_script.sh | 39 +++
setup.py | 6 +-
.../operators/test_kubernetes_pod_operator.py | 69 +++++
17 files changed, 1069 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 9b173a9..6b45153 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -89,9 +89,6 @@ before_script:
- psql -c 'create database airflow;' -U postgres
- export PATH=${PATH}:/tmp/hive/bin
script:
- - pip --version
- - ls -l $HOME/.wheelhouse
- - tox --version
- - tox -e $TOX_ENV
+ - ./scripts/ci/travis_script.sh
after_success:
- codecov
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
new file mode 100644
index 0000000..7dc895e
--- /dev/null
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def load_kube_config(in_cluster=True):
+ from kubernetes import config, client
+ if in_cluster:
+ config.load_incluster_config()
+ else:
+ try:
+ config.load_kube_config()
+ return client.CoreV1Api()
+ except NotImplementedError:
+ NotImplementedError(
+ "requires incluster config or defined configuration in airflow.cfg")
+
+
+def get_kube_client(in_cluster=True):
+ # TODO: This should also allow people to point to a cluster.
+
+ from kubernetes import client
+ load_kube_config(in_cluster)
+ return client.CoreV1Api()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
new file mode 100644
index 0000000..9921696
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -0,0 +1,12 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
new file mode 100644
index 0000000..9398bef
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta, abstractmethod
+import six
+
+
+class KubernetesRequestFactory:
+ """
+ Create requests to be sent to kube API.
+ Extend this class to talk to kubernetes and generate your specific resources.
+ This is equivalent of generating yaml files that can be used by `kubectl`
+ """
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def create(self, pod):
+ """
+ Creates the request for kubernetes API.
+
+ :param pod: The pod object
+ """
+ pass
+
+ @staticmethod
+ def extract_image(pod, req):
+ req['spec']['containers'][0]['image'] = pod.image
+
+ @staticmethod
+ def extract_image_pull_policy(pod, req):
+ if pod.image_pull_policy:
+ req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
+
+ @staticmethod
+ def add_secret_to_env(env, secret):
+ env.append({
+ 'name': secret.deploy_target,
+ 'valueFrom': {
+ 'secretKeyRef': {
+ 'name': secret.secret,
+ 'key': secret.key
+ }
+ }
+ })
+
+ @staticmethod
+ def extract_labels(pod, req):
+ req['metadata']['labels'] = req['metadata'].get('labels', {})
+ for k, v in six.iteritems(pod.labels):
+ req['metadata']['labels'][k] = v
+
+ @staticmethod
+ def extract_cmds(pod, req):
+ req['spec']['containers'][0]['command'] = pod.cmds
+
+ @staticmethod
+ def extract_args(pod, req):
+ req['spec']['containers'][0]['args'] = pod.args
+
+ @staticmethod
+ def extract_node_selector(pod, req):
+ if len(pod.node_selectors) > 0:
+ req['spec']['nodeSelector'] = pod.node_selectors
+
+ @staticmethod
+ def attach_volumes(pod, req):
+ req['spec']['volumes'] = pod.volumes
+
+ @staticmethod
+ def attach_volume_mounts(pod, req):
+ if len(pod.volume_mounts) > 0:
+ req['spec']['containers'][0]['volumeMounts'] = (
+ req['spec']['containers'][0].get('volumeMounts', []))
+ req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
+
+ @staticmethod
+ def extract_name(pod, req):
+ req['metadata']['name'] = pod.name
+
+ @staticmethod
+ def extract_volume_secrets(pod, req):
+ vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+ if any(vol_secrets):
+ req['spec']['containers'][0]['volumeMounts'] = []
+ req['spec']['volumes'] = []
+ for idx, vol in enumerate(vol_secrets):
+ vol_id = 'secretvol' + str(idx)
+ req['spec']['containers'][0]['volumeMounts'].append({
+ 'mountPath': vol.deploy_target,
+ 'name': vol_id,
+ 'readOnly': True
+ })
+ req['spec']['volumes'].append({
+ 'name': vol_id,
+ 'secret': {
+ 'secretName': vol.secret
+ }
+ })
+
+ @staticmethod
+ def extract_env_and_secrets(pod, req):
+ env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+ if len(pod.envs) > 0 or len(env_secrets) > 0:
+ env = []
+ for k in pod.envs.keys():
+ env.append({'name': k, 'value': pod.envs[k]})
+ for secret in env_secrets:
+ KubernetesRequestFactory.add_secret_to_env(env, secret)
+ req['spec']['containers'][0]['env'] = env
+
+ @staticmethod
+ def extract_resources(pod, req):
+ if not pod.resources or pod.resources.is_empty_resource_request():
+ return
+
+ req['spec']['containers'][0]['resources'] = {}
+
+ if pod.resources.has_requests():
+ req['spec']['containers'][0]['resources']['requests'] = {}
+ if pod.resources.request_memory:
+ req['spec']['containers'][0]['resources']['requests'][
+ 'memory'] = pod.resources.request_memory
+ if pod.resources.request_cpu:
+ req['spec']['containers'][0]['resources']['requests'][
+ 'cpu'] = pod.resources.request_cpu
+
+ if pod.resources.has_limits():
+ req['spec']['containers'][0]['resources']['limits'] = {}
+ if pod.resources.request_memory:
+ req['spec']['containers'][0]['resources']['limits'][
+ 'memory'] = pod.resources.limit_memory
+ if pod.resources.request_cpu:
+ req['spec']['containers'][0]['resources']['limits'][
+ 'cpu'] = pod.resources.limit_cpu
+
+ @staticmethod
+ def extract_init_containers(pod, req):
+ if pod.init_containers:
+ req['spec']['initContainers'] = pod.init_containers
+
+ @staticmethod
+ def extract_service_account_name(pod, req):
+ if pod.service_account_name:
+ req['spec']['serviceAccountName'] = pod.service_account_name
+
+ @staticmethod
+ def extract_image_pull_secrets(pod, req):
+ if pod.image_pull_secrets:
+ req['spec']['imagePullSecrets'] = [{
+ 'name': pull_secret
+ } for pull_secret in pod.image_pull_secrets.split(',')]
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
new file mode 100644
index 0000000..3be1a13
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+
+import yaml
+from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
+ import KubernetesRequestFactory
+
+
+class SimplePodRequestFactory(KubernetesRequestFactory):
+ """
+ Request generator for a simple pod.
+ """
+ _yaml = """apiVersion: v1
+kind: Pod
+metadata:
+ name: name
+spec:
+ containers:
+ - name: base
+ image: airflow-slave:latest
+ command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
+ restartPolicy: Never
+ """
+
+ def __init__(self):
+ pass
+
+ def create(self, pod):
+ # type: (Pod) -> dict
+ req = yaml.load(self._yaml)
+ self.extract_name(pod, req)
+ self.extract_labels(pod, req)
+ self.extract_image(pod, req)
+ self.extract_image_pull_policy(pod, req)
+ self.extract_cmds(pod, req)
+ self.extract_args(pod, req)
+ self.extract_node_selector(pod, req)
+ self.extract_env_and_secrets(pod, req)
+ self.extract_volume_secrets(pod, req)
+ self.attach_volumes(pod, req)
+ self.attach_volume_mounts(pod, req)
+ self.extract_resources(pod, req)
+ self.extract_service_account_name(pod, req)
+ self.extract_init_containers(pod, req)
+ self.extract_image_pull_secrets(pod, req)
+ return req
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
new file mode 100644
index 0000000..6a9f76d
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod.py
@@ -0,0 +1,92 @@
+
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Resources:
+ def __init__(
+ self,
+ request_memory=None,
+ request_cpu=None,
+ limit_memory=None,
+ limit_cpu=None):
+ self.request_memory = request_memory
+ self.request_cpu = request_cpu
+ self.limit_memory = limit_memory
+ self.limit_cpu = limit_cpu
+
+ def is_empty_resource_request(self):
+ return not self.has_limits() and not self.has_requests()
+
+ def has_limits(self):
+ return self.limit_cpu is not None or self.limit_memory is not None
+
+ def has_requests(self):
+ return self.request_cpu is not None or self.request_memory is not None
+
+
+class Pod:
+ """
+ Represents a kubernetes pod and manages execution of a single pod.
+ :param image: The docker image
+ :type image: str
+ :param env: A dict containing the environment variables
+ :type env: dict
+ :param cmds: The command to be run on the pod
+ :type cmd: list str
+ :param secrets: Secrets to be launched to the pod
+ :type secrets: list Secret
+ :param result: The result that will be returned to the operator after
+ successful execution of the pod
+ :type result: any
+ """
+ pod_timeout = 3600
+
+ def __init__(
+ self,
+ image,
+ envs,
+ cmds,
+ args=None,
+ secrets=None,
+ labels=None,
+ node_selectors=None,
+ name=None,
+ volumes=None,
+ volume_mounts=None,
+ namespace='default',
+ result=None,
+ image_pull_policy="IfNotPresent",
+ image_pull_secrets=None,
+ init_containers=None,
+ service_account_name=None,
+ resources=None
+ ):
+ self.image = image
+ self.envs = envs or {}
+ self.cmds = cmds
+ self.args = args or []
+ self.secrets = secrets or []
+ self.result = result
+ self.labels = labels or {}
+ self.name = name
+ self.volumes = volumes or []
+ self.volume_mounts = volume_mounts or []
+ self.node_selectors = node_selectors or []
+ self.namespace = namespace
+ self.image_pull_policy = image_pull_policy
+ self.image_pull_secrets = image_pull_secrets
+ self.init_containers = init_containers
+ self.service_account_name = service_account_name
+ self.resources = resources or Resources()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
new file mode 100644
index 0000000..685be37
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow.contrib.kubernetes.pod import Pod
+import uuid
+
+
+class PodGenerator:
+ """Contains Kubernetes Airflow Worker configuration logic"""
+
+ def __init__(self, kube_config=None):
+ self.kube_config = kube_config
+ self.env_vars = {}
+ self.volumes = []
+ self.volume_mounts = []
+ self.init_containers = []
+ self.secrets = []
+
+ def add_init_container(self,
+ name,
+ image,
+ securityContext,
+ init_environment,
+ volume_mounts
+ ):
+ """
+
+ Adds an init container to the launched pod. useful for pre-
+
+ Args:
+ name (str):
+ image (str):
+ securityContext (dict):
+ init_environment (dict):
+ volume_mounts (dict):
+
+ Returns:
+
+ """
+ self.init_containers.append(
+ {
+ 'name': name,
+ 'image': image,
+ 'securityContext': securityContext,
+ 'env': init_environment,
+ 'volumeMounts': volume_mounts
+ }
+ )
+
+ def _get_init_containers(self):
+ return self.init_containers
+
+ def add_volume(self, name):
+ """
+
+ Args:
+ name (str):
+
+ Returns:
+
+ """
+ self.volumes.append({'name': name})
+
+ def add_volume_with_configmap(self, name, config_map):
+ self.volumes.append(
+ {
+ 'name': name,
+ 'configMap': config_map
+ }
+ )
+
+ def add_mount(self,
+ name,
+ mount_path,
+ sub_path,
+ read_only):
+ """
+
+ Args:
+ name (str):
+ mount_path (str):
+ sub_path (str):
+ read_only:
+
+ Returns:
+
+ """
+
+ self.volume_mounts.append({
+ 'name': name,
+ 'mountPath': mount_path,
+ 'subPath': sub_path,
+ 'readOnly': read_only
+ })
+
+ def _get_volumes_and_mounts(self):
+ return self.volumes, self.volume_mounts
+
+ def _get_image_pull_secrets(self):
+ """Extracts any image pull secrets for fetching container(s)"""
+ if not self.kube_config.image_pull_secrets:
+ return []
+ return self.kube_config.image_pull_secrets.split(',')
+
+ def make_pod(self, namespace, image, pod_id, cmds,
+ arguments, labels, kube_executor_config=None):
+ volumes, volume_mounts = self._get_volumes_and_mounts()
+ worker_init_container_spec = self._get_init_containers()
+
+ # resources = Resources(
+ # request_memory=kube_executor_config.request_memory,
+ # request_cpu=kube_executor_config.request_cpu,
+ # limit_memory=kube_executor_config.limit_memory,
+ # limit_cpu=kube_executor_config.limit_cpu
+ # )
+
+ return Pod(
+ namespace=namespace,
+ name=pod_id + "-" + str(uuid.uuid1())[:8],
+ image=image,
+ cmds=cmds,
+ args=arguments,
+ labels=labels,
+ envs=self.env_vars,
+ secrets={},
+ # service_account_name=self.kube_config.worker_service_account_name,
+ # image_pull_secrets=self.kube_config.image_pull_secrets,
+ init_containers=worker_init_container_spec,
+ volumes=volumes,
+ volume_mounts=volume_mounts,
+ resources=None
+ )
+
+
+'''
+This class is a necessary building block to the kubernetes executor, which will be PR'd
+shortly
+'''
+
+
+class WorkerGenerator(PodGenerator):
+ def __init__(self, kube_config):
+ PodGenerator.__init__(self, kube_config)
+ self.volumes, self.volume_mounts = self._init_volumes_and_mounts()
+ self.init_containers = self._init_init_containers()
+
+ def _init_volumes_and_mounts(self):
+ dags_volume_name = "airflow-dags"
+ dags_path = os.path.join(self.kube_config.dags_folder,
+ self.kube_config.git_subpath)
+ volumes = [{
+ 'name': dags_volume_name
+ }]
+ volume_mounts = [{
+ 'name': dags_volume_name,
+ 'mountPath': dags_path,
+ 'readOnly': True
+ }]
+
+ # Mount the airflow.cfg file via a configmap the user has specified
+ if self.kube_config.airflow_configmap:
+ config_volume_name = "airflow-config"
+ config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+ volumes.append({
+ 'name': config_volume_name,
+ 'configMap': {
+ 'name': self.kube_config.airflow_configmap
+ }
+ })
+ volume_mounts.append({
+ 'name': config_volume_name,
+ 'mountPath': config_path,
+ 'subPath': 'airflow.cfg',
+ 'readOnly': True
+ })
+
+ # A PV with the DAGs should be mounted
+ if self.kube_config.dags_volume_claim:
+ volumes[0]['persistentVolumeClaim'] = {
+ "claimName": self.kube_config.dags_volume_claim}
+ if self.kube_config.dags_volume_subpath:
+ volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
+ else:
+ # Create a Shared Volume for the Git-Sync module to populate
+ volumes[0]["emptyDir"] = {}
+ return volumes, volume_mounts
+
+ def _init_labels(self, dag_id, task_id, execution_date):
+ return {
+ "airflow-slave": "",
+ "dag_id": dag_id,
+ "task_id": task_id,
+ "execution_date": execution_date
+ },
+
+ def _get_environment(self):
+ env = super(self, WorkerGenerator).env_vars
+ """Defines any necessary environment variables for the pod executor"""
+ env['AIRFLOW__CORE__DAGS_FOLDER'] = '/tmp/dags'
+ env['AIRFLOW__CORE__EXECUTOR'] = 'LocalExecutor'
+
+ if self.kube_config.airflow_configmap:
+ env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+ return env
+
+ def _init_init_containers(self, volume_mounts):
+ """When using git to retrieve the DAGs, use the GitSync Init Container"""
+ # If we're using volume claims to mount the dags, no init container is needed
+ if self.kube_config.dags_volume_claim:
+ return []
+
+ # Otherwise, define a git-sync init container
+ init_environment = [{
+ 'name': 'GIT_SYNC_REPO',
+ 'value': self.kube_config.git_repo
+ }, {
+ 'name': 'GIT_SYNC_BRANCH',
+ 'value': self.kube_config.git_branch
+ }, {
+ 'name': 'GIT_SYNC_ROOT',
+ 'value': '/tmp'
+ }, {
+ 'name': 'GIT_SYNC_DEST',
+ 'value': 'dags'
+ }, {
+ 'name': 'GIT_SYNC_ONE_TIME',
+ 'value': 'true'
+ }]
+ if self.kube_config.git_user:
+ init_environment.append({
+ 'name': 'GIT_SYNC_USERNAME',
+ 'value': self.kube_config.git_user
+ })
+ if self.kube_config.git_password:
+ init_environment.append({
+ 'name': 'GIT_SYNC_PASSWORD',
+ 'value': self.kube_config.git_password
+ })
+
+ volume_mounts[0]['readOnly'] = False
+ return [{
+ 'name': self.kube_config.git_sync_init_container_name,
+ 'image': self.kube_config.git_sync_container,
+ 'securityContext': {'runAsUser': 0},
+ 'env': init_environment,
+ 'volumeMounts': volume_mounts
+ }]
+
+ def make_worker_pod(self,
+ namespace,
+ pod_id,
+ dag_id,
+ task_id,
+ execution_date,
+ airflow_command,
+ kube_executor_config):
+ cmds = ["bash", "-cx", "--"]
+ labels = self._init_labels(dag_id, task_id, execution_date)
+ PodGenerator.make_pod(self,
+ namespace=namespace,
+ pod_id=pod_id,
+ cmds=cmds,
+ arguments=airflow_command,
+ labels=labels,
+ kube_executor_config=kube_executor_config)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
new file mode 100644
index 0000000..c910929
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -0,0 +1,119 @@
+
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import time
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
+from datetime import datetime as dt
+from airflow.contrib.kubernetes.kubernetes_request_factory import \
+ pod_request_factory as pod_fac
+from kubernetes import watch
+from kubernetes.client.rest import ApiException
+from airflow import AirflowException
+
+from .kube_client import get_kube_client
+
+
+class PodStatus(object):
+ PENDING = 'pending'
+ RUNNING = 'running'
+ FAILED = 'failed'
+ SUCCEEDED = 'succeeded'
+
+
+class PodLauncher(LoggingMixin):
+ def __init__(self, kube_client=None):
+ super(PodLauncher, self).__init__()
+ self._client = kube_client or get_kube_client()
+ self._watch = watch.Watch()
+ self.kube_req_factory = pod_fac.SimplePodRequestFactory()
+
+ def run_pod_async(self, pod):
+ req = self.kube_req_factory.create(pod)
+ self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, indent=2)))
+ try:
+ resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace)
+ self.log.debug('Pod Creation Response: {}'.format(resp))
+ except ApiException:
+ self.log.exception('Exception when attempting to create Namespaced Pod.')
+ raise
+ return resp
+
+ def run_pod(self, pod, startup_timeout=120):
+ # type: (Pod) -> State
+ """
+ Launches the pod synchronously and waits for completion.
+
+ Args:
+ pod (Pod):
+ startup_timeout (int): Timeout for startup of the pod (if pod is pending for
+ too long, considers task a failure
+ """
+ resp = self.run_pod_async(pod)
+ curr_time = dt.now()
+ if resp.status.start_time is None:
+ while self.pod_not_started(pod):
+ delta = dt.now() - curr_time
+ if delta.seconds >= startup_timeout:
+ raise AirflowException("Pod took too long to start")
+ time.sleep(1)
+ self.log.debug('Pod not yet started')
+
+ final_status = self._monitor_pod(pod)
+ return final_status
+
+ def _monitor_pod(self, pod):
+ # type: (Pod) -> State
+
+ while self.pod_is_running(pod):
+ self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
+ time.sleep(2)
+ return self._task_status(self.read_pod(pod))
+
+ def _task_status(self, event):
+ # type: (V1Pod) -> State
+ self.log.info(
+ "Event: {} had an event of type {}".format(event.metadata.name,
+ event.status.phase))
+ status = self.process_status(event.metadata.name, event.status.phase)
+ return status
+
+ def pod_not_started(self, pod):
+ state = self._task_status(self.read_pod(pod))
+ return state == State.QUEUED
+
+ def pod_is_running(self, pod):
+ state = self._task_status(self.read_pod(pod))
+ return state != State.SUCCESS and state != State.FAILED
+
+ def read_pod(self, pod):
+ return self._client.read_namespaced_pod(pod.name, pod.namespace)
+
+ def process_status(self, job_id, status):
+ status = status.lower()
+ if status == PodStatus.PENDING:
+ return State.QUEUED
+ elif status == PodStatus.FAILED:
+ self.log.info("Event: {} Failed".format(job_id))
+ return State.FAILED
+ elif status == PodStatus.SUCCEEDED:
+ self.log.info("Event: {} Succeeded".format(job_id))
+ return State.SUCCESS
+ elif status == PodStatus.RUNNING:
+ return State.RUNNING
+ else:
+ self.log.info("Event: Invalid state {} on job {}".format(status, job_id))
+ return State.FAILED
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
new file mode 100644
index 0000000..15f070e
--- /dev/null
+++ b/airflow/contrib/kubernetes/secret.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Secret:
+ """Defines Kubernetes Secret Volume"""
+
+ def __init__(self, deploy_type, deploy_target, secret, key):
+ """Initialize a Kubernetes Secret Object. Used to track requested secrets from
+ the user.
+
+ :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
+ `volume`
+ :type deploy_type: ``str``
+ :param deploy_target: The environment variable to be created in the worker.
+ :type deploy_target: ``str``
+ :param secret: Name of the secrets object in Kubernetes
+ :type secret: ``str``
+ :param key: Key of the secret within the Kubernetes Secret
+ :type key: ``str``
+ """
+ self.deploy_type = deploy_type
+ self.deploy_target = deploy_target.upper()
+ self.secret = secret
+ self.key = key
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
new file mode 100644
index 0000000..f09a25c
--- /dev/null
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
+from airflow.utils.state import State
+
+template_fields = ('templates_dict',)
+template_ext = tuple()
+ui_color = '#ffefeb'
+
+
+class KubernetesPodOperator(BaseOperator):
+ def execute(self, context):
+ try:
+
+ client = kube_client.get_kube_client(in_cluster=self.in_cluster)
+ gen = pod_generator.PodGenerator()
+
+ pod = gen.make_pod(namespace=self.namespace,
+ image=self.image,
+ pod_id=self.name,
+ cmds=self.cmds,
+ arguments=self.arguments,
+ labels=self.labels,
+ kube_executor_config=self.kube_executor_config
+ )
+
+ launcher = pod_launcher.PodLauncher(client)
+ final_state = launcher.run_pod(pod, self.startup_timeout_seconds)
+ if final_state != State.SUCCESS:
+ raise AirflowException("Pod returned a failure")
+ except AirflowException as ex:
+ raise AirflowException("Pod Launching failed: {error}".format(error=ex))
+
+ @apply_defaults
+ def __init__(self,
+ namespace,
+ image,
+ cmds,
+ arguments,
+ name,
+ in_cluster=False,
+ labels=None,
+ startup_timeout_seconds=120,
+ kube_executor_config=None,
+ *args,
+ **kwargs):
+ super(KubernetesPodOperator, self).__init__(*args, **kwargs)
+ self.kube_executor_config = kube_executor_config or {}
+ self.image = image
+ self.namespace = namespace
+ self.cmds = cmds
+ self.arguments = arguments
+ self.labels = labels or {}
+ self.startup_timeout_seconds = startup_timeout_seconds
+ self.name = name
+ self.in_cluster = in_cluster
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
new file mode 100755
index 0000000..f78cb3a
--- /dev/null
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+
+# Guard against a kubernetes cluster already being up
+#!/usr/bin/env bash
+kubectl get pods &> /dev/null
+if [ $? -eq 0 ]; then
+ echo "kubectl get pods returned 0 exit code, exiting early"
+ exit 0
+fi
+#
+
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+
+sudo mkdir -p /usr/local/bin
+sudo mv minikube /usr/local/bin/minikube
+sudo mv kubectl /usr/local/bin/kubectl
+
+export MINIKUBE_WANTUPDATENOTIFICATION=false
+export MINIKUBE_WANTREPORTERRORPROMPT=false
+export MINIKUBE_HOME=$HOME
+export CHANGE_MINIKUBE_NONE_USER=true
+mkdir $HOME/.kube || true
+touch $HOME/.kube/config
+
+export KUBECONFIG=$HOME/.kube/config
+sudo -E minikube start --vm-driver=none
+
+# this for loop waits until kubectl can access the api server that minikube has created
+for i in {1..150} # timeout for 5 minutes
+do
+ echo "------- Running kubectl get pods -------"
+ kubectl get po &> /dev/null
+ if [ $? -ne 1 ]; then
+ break
+ fi
+ sleep 2
+done
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/setup_kubernetes.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/setup_kubernetes.sh b/scripts/ci/kubernetes/setup_kubernetes.sh
new file mode 100755
index 0000000..fa4e523
--- /dev/null
+++ b/scripts/ci/kubernetes/setup_kubernetes.sh
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+
+set -o xtrace
+set -e
+
+echo "This script downloads minikube, starts a driver=None minikube cluster, builds the airflow source and docker image, and then deploys airflow onto kubernetes"
+echo "For development, start minikube yourself (ie: minikube start) then run this script as you probably do not want a driver=None minikube cluster"
+
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+
+$DIRNAME/minikube/start_minikube.sh
+
+echo "Airflow environment on kubernetes is good to go!"
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 2b5a8c9..b6ed49c 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -93,3 +93,4 @@ thrift
thrift_sasl
unicodecsv
zdesk
+kubernetes
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
new file mode 100755
index 0000000..a51e742
--- /dev/null
+++ b/scripts/ci/travis_script.sh
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+AIRFLOW_ROOT="$DIRNAME/../.."
+cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
+
+if [ -z "$RUN_KUBE_INTEGRATION" ];
+then
+ $DIRNAME/kubernetes/setup_kubernetes.sh
+ tox -e $TOX_ENV
+else
+ $DIRNAME/kubernetes/setup_kubernetes.sh && \
+ tox -e $TOX_ENV -- tests.contrib.executors.integration \
+ --with-coverage \
+ --cover-erase \
+ --cover-html \
+ --cover-package=airflow \
+ --cover-html-dir=airflow/www/static/coverage \
+ --with-ignore-docstrings \
+ --rednose \
+ --with-timer \
+ -v \
+ --logging-level=DEBUG
+fi
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 77e79bf..ba5b031 100644
--- a/setup.py
+++ b/setup.py
@@ -162,6 +162,8 @@ github_enterprise = ['Flask-OAuthlib>=0.9.1']
qds = ['qds-sdk>=1.9.6']
cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
redis = ['redis>=2.10.5']
+kubernetes = ['kubernetes>=3.0.0',
+ 'cryptography>=2.0.0']
all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
devel = [
@@ -182,7 +184,8 @@ devel = [
]
devel_minreq = devel + mysql + doc + password + s3 + cgroups
devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
-devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh
+devel_all = (devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh +
+ kubernetes)
def do_setup():
@@ -278,6 +281,7 @@ def do_setup():
'webhdfs': webhdfs,
'jira': jira,
'redis': redis,
+ 'kubernetes': kubernetes
},
classifiers=[
'Development Status :: 5 - Production/Stable',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..205f183
--- /dev/null
+++ b/tests/contrib/operators/test_kubernetes_pod_operator.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+ def test_working_pod(self):
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ k.execute(None)
+
+ def test_faulty_image(self):
+ bad_image_name = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image=bad_image_name,
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5
+ )
+ with self.assertRaises(AirflowException) as cm:
+ k.execute(None),
+
+ print("exception: {}".format(cm))
+
+ def test_pod_failure(self):
+ """
+ Tests that the task fails when a pod reports a failure
+ """
+
+ bad_internal_command = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=[bad_internal_command, "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ with self.assertRaises(AirflowException):
+ k.execute(None)
[15/16] incubator-airflow git commit: [AIRFLOW-1517] Kubernetes
operator PR fixes
Posted by bo...@apache.org.
[AIRFLOW-1517] Kubernetes operator PR fixes
Fix python flake8 linting issues and AIRFLOW license issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7fb5906e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7fb5906e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7fb5906e
Branch: refs/heads/master
Commit: 7fb5906e68fdf351e97acbf04f334b2a86081e81
Parents: d5b13a3
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Jan 11 16:24:23 2018 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:34 2018 -0800
----------------------------------------------------------------------
scripts/ci/kubernetes/minikube/start_minikube.sh | 3 +--
tests/contrib/minikube_tests/test_kubernetes_pod_operator.py | 5 +++--
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fb5906e/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 349b210..be370cf 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -48,7 +47,7 @@ start_minikube(){
do
echo "------- Running kubectl get pods -------"
STDERR=$(kubectl get pods 2>&1 >/dev/null)
- if [ $? -ne 1 ]; then
+ if [ $? -eq 0 ]; then
echo $STDERR
# We do not need dynamic hostpath provisioning, so disable the default storageclass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fb5906e/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
index a9a8e97..4bbde8f 100644
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -19,16 +19,17 @@ import unittest
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import AirflowException
from subprocess import check_call
-import logging
try:
check_call(["kubectl", "get", "pods"])
except Exception as e:
raise unittest.SkipTest(
- "Kubernetes integration tests require a minikube cluster; Skipping tests {}".format(e)
+ "Kubernetes integration tests require a minikube cluster;"
+ "Skipping tests {}".format(e)
)
+
class KubernetesPodOperatorTest(unittest.TestCase):
def test_working_pod(self):
[14/16] incubator-airflow git commit: [AIRFLOW-1517] addressed PR
comments
Posted by bo...@apache.org.
[AIRFLOW-1517] addressed PR comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d5b13a3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d5b13a3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d5b13a3d
Branch: refs/heads/master
Commit: d5b13a3dadb4632006fabfd96ea062dd70f612de
Parents: 12b725d
Author: Daniel Imberman <da...@gmail.com>
Authored: Tue Jan 2 09:38:30 2018 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:27 2018 -0800
----------------------------------------------------------------------
airflow/contrib/kubernetes/kube_client.py | 1 +
airflow/contrib/kubernetes/pod.py | 2 --
airflow/contrib/kubernetes/pod_launcher.py | 8 +++--
docs/kubernetes.rst | 28 ++++++---------
.../ci/kubernetes/minikube/start_minikube.sh | 36 ++++++++++----------
.../test_kubernetes_pod_operator.py | 6 ++--
6 files changed, 39 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index ecb3d55..d1a63a2 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -25,6 +25,7 @@ def _load_kube_config(in_cluster):
config.load_kube_config()
return client.CoreV1Api()
+
def get_kube_client(in_cluster=True):
# TODO: This should also allow people to point to a cluster.
return _load_kube_config(in_cluster)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index d80b626..b4eb5a1 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -53,8 +53,6 @@ class Pod:
successful execution of the pod
:type result: any
"""
- pod_timeout = 3600
-
def __init__(
self,
image,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index d2d3af7..51f443b 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -25,7 +25,7 @@ from airflow.contrib.kubernetes.kubernetes_request_factory import \
from kubernetes import watch
from kubernetes.client.rest import ApiException
from airflow import AirflowException
-
+from requests.exceptions import HTTPError
from .kube_client import get_kube_client
@@ -102,7 +102,11 @@ class PodLauncher(LoggingMixin):
return state != State.SUCCESS and state != State.FAILED
def read_pod(self, pod):
- return self._client.read_namespaced_pod(pod.name, pod.namespace)
+ try:
+ return self._client.read_namespaced_pod(pod.name, pod.namespace)
+ except HTTPError as e:
+ raise AirflowException("There was an error reading the kubernetes API: {}"
+ .format(e))
def process_status(self, job_id, status):
status = status.lower()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 8d57028..21d8501 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -17,20 +17,14 @@ Kubernetes Operator
-+--------------+----------------------------------------------------------------+---------------+
-| name | description |
-+==============+================================================================+===============+
-| ``@namespace`` | The namespace is your isolated work environment within kubernetes|
-+--------------+----------------------------------------------------------------+---------------+
-| ``@image`` | docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories |
-+--------------+----------------------------------------------------------------+---------------+
-| ``@cmnds`` | To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container |
-+--------------+----------------------------------------------------------------+---------------+
-| ``arguments`` | arguments for your bash command |
-+--------------+----------------------------------------------------------------+---------------+
-| ``@labels`` | Labels are an important element of launching kubernetes pods, as it tells
-| | kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} |
-| | and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service. |
-+--------------+----------------------------------------------------------------+---------------+
-| ``@name`` | name of the task you want to run, will be used to generate a pod id |
-+--------------+----------------------------------------------------------------+---------------+
+================================= ====================================
+Variable Description
+================================= ====================================
+``@namespace`` The namespace is your isolated work environment within kubernetes
+``@image`` docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories
+ ``@cmds`` To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container
+``arguments`` arguments for your bash command
+``@labels`` Labels are an important element of launching kubernetes pods, as it tells kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service.
+``@name`` name of the task you want to run, will be used to generate a pod id
+================================= ====================================
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 1da23d0..349b210 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -1,21 +1,21 @@
-# Licensed to the Apache Software Foundation (ASF) under one *
-# or more contributor license agreements. See the NOTICE file *
-# distributed with this work for additional information *
-# regarding copyright ownership. The ASF licenses this file *
-# to you under the Apache License, Version 2.0 (the *
-# "License"); you may not use this file except in compliance *
-# with the License. You may obtain a copy of the License at *
-# *
-# http://www.apache.org/licenses/LICENSE-2.0 *
-# *
-# Unless required by applicable law or agreed to in writing, *
-# software distributed under the License is distributed on an *
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
-# KIND, either express or implied. See the License for the *
-# specific language governing permissions and limitations *
-# under the License. *
-
#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
# Guard against a kubernetes cluster already being up
kubectl get pods &> /dev/null
if [ $? -eq 0 ]; then
@@ -54,7 +54,7 @@ start_minikube(){
# We do not need dynamic hostpath provisioning, so disable the default storageclass
sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
- # We need to give permission to watch pods to the airflow scheduler.
+ # We need to give permission to watch pods to the airflow scheduler.
# The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default
exit 0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
index 5cdd819..a9a8e97 100644
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -19,16 +19,16 @@ import unittest
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import AirflowException
from subprocess import check_call
+import logging
try:
check_call(["kubectl", "get", "pods"])
-except:
+except Exception as e:
raise unittest.SkipTest(
- "Kubernetes integration tests require a minikube cluster; Skipping tests"
+ "Kubernetes integration tests require a minikube cluster; Skipping tests {}".format(e)
)
-
class KubernetesPodOperatorTest(unittest.TestCase):
def test_working_pod(self):
[07/16] incubator-airflow git commit: [AIRFLOW-1517] fixed license
issues
Posted by bo...@apache.org.
[AIRFLOW-1517] fixed license issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/02a93848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/02a93848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/02a93848
Branch: refs/heads/master
Commit: 02a93848fe285a5ba9ad81e8a122b36ed83398b8
Parents: eeff445
Author: Daniel Imberman <da...@gmail.com>
Authored: Fri Dec 29 14:08:35 2017 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:15 2018 -0800
----------------------------------------------------------------------
airflow/contrib/kubernetes/__init__.py | 25 ++++++++++---------
airflow/contrib/kubernetes/kube_client.py | 25 ++++++++++---------
.../kubernetes_request_factory/__init__.py | 24 ++++++++++--------
.../kubernetes_request_factory.py | 25 ++++++++++---------
.../pod_request_factory.py | 24 ++++++++++--------
airflow/contrib/kubernetes/pod.py | 26 +++++++++++---------
airflow/contrib/kubernetes/pod_generator.py | 25 ++++++++++---------
airflow/contrib/kubernetes/pod_launcher.py | 26 +++++++++++---------
airflow/contrib/kubernetes/secret.py | 25 ++++++++++---------
.../operators/kubernetes_pod_operator.py | 25 ++++++++++---------
10 files changed, 140 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py
index 9d7677a..13a8339 100644
--- a/airflow/contrib/kubernetes/__init__.py
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -1,13 +1,16 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 89de3e6..ecb3d55 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
def _load_kube_config(in_cluster):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
index 9921696..13a8339 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -1,12 +1,16 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 7d698b4..0324781 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
from abc import ABCMeta, abstractmethod
import six
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 3be1a13..44b05dd 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -1,15 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import yaml
from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index cdb1d65..ba6ac06 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -1,17 +1,19 @@
-
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 685be37..cf85092 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import os
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index c910929..d2d3af7 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -1,17 +1,19 @@
-
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import json
import time
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index 15f070e..ec5d51c 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
class Secret:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f09a25c..5d03875 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
[11/16] incubator-airflow git commit: [AIRFLOW-1517] Restore
authorship of resources
Posted by bo...@apache.org.
[AIRFLOW-1517] Restore authorship of resources
Collaboration authors got destroyed when splitting up a PR, this commit adds back in the code which was be removed in the previous commit to restore authorship
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c9e3c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c9e3c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c9e3c1f
Branch: refs/heads/master
Commit: 7c9e3c1f849f5b12c7cb990b3f218aadab2f9586
Parents: 02a9384
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:39:48 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 33 ++++++++++++++++++--
airflow/contrib/kubernetes/pod.py | 19 +++++++++++
2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c9e3c1f/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 0324781..cbf3fce 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -40,6 +40,10 @@ class KubernetesRequestFactory:
def extract_image(pod, req):
req['spec']['containers'][0]['image'] = pod.image
+ @staticmethod
+ def extract_image_pull_policy(pod, req):
+ if pod.image_pull_policy:
+ req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
@staticmethod
def add_secret_to_env(env, secret):
@@ -72,7 +76,9 @@ class KubernetesRequestFactory:
if len(pod.node_selectors) > 0:
req['spec']['nodeSelector'] = pod.node_selectors
-
+ @staticmethod
+ def attach_volumes(pod, req):
+ req['spec']['volumes'] = pod.volumes
@staticmethod
def attach_volume_mounts(pod, req):
@@ -116,7 +122,30 @@ class KubernetesRequestFactory:
KubernetesRequestFactory.add_secret_to_env(env, secret)
req['spec']['containers'][0]['env'] = env
-
+ @staticmethod
+ def extract_resources(pod, req):
+ if not pod.resources or pod.resources.is_empty_resource_request():
+ return
+
+ req['spec']['containers'][0]['resources'] = {}
+
+ if pod.resources.has_requests():
+ req['spec']['containers'][0]['resources']['requests'] = {}
+ if pod.resources.request_memory:
+ req['spec']['containers'][0]['resources']['requests'][
+ 'memory'] = pod.resources.request_memory
+ if pod.resources.request_cpu:
+ req['spec']['containers'][0]['resources']['requests'][
+ 'cpu'] = pod.resources.request_cpu
+
+ if pod.resources.has_limits():
+ req['spec']['containers'][0]['resources']['limits'] = {}
+ if pod.resources.request_memory:
+ req['spec']['containers'][0]['resources']['limits'][
+ 'memory'] = pod.resources.limit_memory
+ if pod.resources.request_cpu:
+ req['spec']['containers'][0]['resources']['limits'][
+ 'cpu'] = pod.resources.limit_cpu
@staticmethod
def extract_init_containers(pod, req):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c9e3c1f/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index ba6ac06..d80b626 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -16,7 +16,26 @@
# under the License.
+class Resources:
+ def __init__(
+ self,
+ request_memory=None,
+ request_cpu=None,
+ limit_memory=None,
+ limit_cpu=None):
+ self.request_memory = request_memory
+ self.request_cpu = request_cpu
+ self.limit_memory = limit_memory
+ self.limit_cpu = limit_cpu
+
+ def is_empty_resource_request(self):
+ return not self.has_limits() and not self.has_requests()
+
+ def has_limits(self):
+ return self.limit_cpu is not None or self.limit_memory is not None
+ def has_requests(self):
+ return self.request_cpu is not None or self.request_memory is not None
class Pod:
[05/16] incubator-airflow git commit: [AIRFLOW-1517] Created more
accurate failures for kube cluster issues
Posted by bo...@apache.org.
[AIRFLOW-1517] Created more accurate failures for kube cluster issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eeff4459
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eeff4459
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eeff4459
Branch: refs/heads/master
Commit: eeff445969743626516825aea5ea559195eeaae3
Parents: ada7aed
Author: Daniel Imberman <da...@gmail.com>
Authored: Fri Dec 29 14:06:42 2017 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:28:33 2018 -0800
----------------------------------------------------------------------
airflow/contrib/kubernetes/kube_client.py | 9 ++-------
1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eeff4459/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 9db8641..89de3e6 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -19,13 +19,8 @@ def _load_kube_config(in_cluster):
config.load_incluster_config()
return client.CoreV1Api()
else:
- try:
- config.load_kube_config()
- return client.CoreV1Api()
- except NotImplementedError:
- NotImplementedError(
- "requires incluster config or defined configuration in airflow.cfg")
-
+ config.load_kube_config()
+ return client.CoreV1Api()
def get_kube_client(in_cluster=True):
# TODO: This should also allow people to point to a cluster.
[16/16] incubator-airflow git commit: Merge pull request #2853 from
dimberman/Airflow_1517_kubenetes_operator
Posted by bo...@apache.org.
Merge pull request #2853 from dimberman/Airflow_1517_kubenetes_operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1abe7f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1abe7f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1abe7f6d
Branch: refs/heads/master
Commit: 1abe7f6d5413b81569be97e7871a688e114f3c47
Parents: b48bbbd 7fb5906
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Jan 12 19:02:52 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 12 19:02:52 2018 +0100
----------------------------------------------------------------------
.travis.yml | 14 +-
airflow/contrib/kubernetes/__init__.py | 16 ++
airflow/contrib/kubernetes/kube_client.py | 31 ++
.../kubernetes_request_factory/__init__.py | 16 ++
.../kubernetes_request_factory.py | 165 +++++++++++
.../pod_request_factory.py | 60 ++++
airflow/contrib/kubernetes/pod.py | 92 ++++++
airflow/contrib/kubernetes/pod_generator.py | 281 +++++++++++++++++++
airflow/contrib/kubernetes/pod_launcher.py | 125 +++++++++
airflow/contrib/kubernetes/secret.py | 39 +++
.../operators/kubernetes_pod_operator.py | 74 +++++
docs/kubernetes.rst | 30 ++
.../ci/kubernetes/minikube/start_minikube.sh | 80 ++++++
scripts/ci/kubernetes/setup_kubernetes.sh | 28 ++
scripts/ci/requirements.txt | 1 +
scripts/ci/run_tests.sh | 2 +-
scripts/ci/travis_script.sh | 38 +++
setup.py | 6 +-
tests/contrib/minikube_tests/__init__.py | 13 +
.../test_kubernetes_pod_operator.py | 79 ++++++
20 files changed, 1184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1abe7f6d/scripts/ci/requirements.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1abe7f6d/setup.py
----------------------------------------------------------------------
[12/16] incubator-airflow git commit: [AIRFLOW-1517] started
documentation of k8s operator
Posted by bo...@apache.org.
[AIRFLOW-1517] started documentation of k8s operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/12b725df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/12b725df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/12b725df
Branch: refs/heads/master
Commit: 12b725df154e28511edaa85074614c8974175b0d
Parents: 28d9d7f
Author: Daniel Imberman <da...@gmail.com>
Authored: Tue Jan 2 09:22:57 2018 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:17 2018 -0800
----------------------------------------------------------------------
docs/kubernetes.rst | 36 ++++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12b725df/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
new file mode 100644
index 0000000..8d57028
--- /dev/null
+++ b/docs/kubernetes.rst
@@ -0,0 +1,36 @@
+Kubernetes Operator
+=========
+
+
+
+.. code:: python
+
+ from airflow.comtrib.operators import KubernetesOperator
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+
+
++--------------+----------------------------------------------------------------+---------------+
+| name | description |
++==============+================================================================+===============+
+| ``@namespace`` | The namespace is your isolated work environment within kubernetes|
++--------------+----------------------------------------------------------------+---------------+
+| ``@image`` | docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories |
++--------------+----------------------------------------------------------------+---------------+
+| ``@cmnds`` | To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container |
++--------------+----------------------------------------------------------------+---------------+
+| ``arguments`` | arguments for your bash command |
++--------------+----------------------------------------------------------------+---------------+
+| ``@labels`` | Labels are an important element of launching kubernetes pods, as it tells
+| | kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} |
+| | and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service. |
++--------------+----------------------------------------------------------------+---------------+
+| ``@name`` | name of the task you want to run, will be used to generate a pod id |
++--------------+----------------------------------------------------------------+---------------+
[08/16] incubator-airflow git commit: [AIRFLOW-1517] Add minikube for
kubernetes integration tests
Posted by bo...@apache.org.
[AIRFLOW-1517] Add minikube for kubernetes integration tests
Add better support for minikube integration tests; By default minikube integration tests will run with kubernetes 1.7 and kubernetes 1.8
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/965439be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/965439be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/965439be
Branch: refs/heads/master
Commit: 965439bef04de4744796c8516ad8ff7e548639e5
Parents: a42dbb4
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Wed Dec 27 14:21:20 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800
----------------------------------------------------------------------
.travis.yml | 9 +++
airflow/contrib/kubernetes/kube_client.py | 8 +-
.../ci/kubernetes/minikube/start_minikube.sh | 58 +++++++++++----
scripts/ci/run_tests.sh | 2 +-
scripts/ci/travis_script.sh | 7 +-
tests/contrib/minikube_tests/__init__.py | 13 ++++
.../test_kubernetes_pod_operator.py | 78 ++++++++++++++++++++
.../operators/test_kubernetes_pod_operator.py | 69 -----------------
8 files changed, 150 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6b45153..dec9181 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,6 +54,8 @@ env:
- TOX_ENV=py35-backend_sqlite
- TOX_ENV=py35-backend_postgres
- TOX_ENV=flake8
+ - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+ - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
matrix:
exclude:
- python: "3.5"
@@ -70,6 +72,13 @@ matrix:
env: TOX_ENV=py35-backend_postgres
- python: "2.7"
env: TOX_ENV=flake8
+ - python: "3.5"
+ env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+ - python: "3.5"
+ env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+ allow_failures:
+ - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+ - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
cache:
directories:
- $HOME/.wheelhouse/
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index cd68caf..ecb3d55 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -16,17 +16,15 @@
# under the License.
-def load_kube_config(in_cluster=True):
+def _load_kube_config(in_cluster):
from kubernetes import config, client
if in_cluster:
config.load_incluster_config()
+ return client.CoreV1Api()
else:
config.load_kube_config()
return client.CoreV1Api()
def get_kube_client(in_cluster=True):
# TODO: This should also allow people to point to a cluster.
-
- from kubernetes import client
- load_kube_config(in_cluster)
- return client.CoreV1Api()
+ return _load_kube_config(in_cluster)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index f78cb3a..1da23d0 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations *
# under the License. *
-# Guard against a kubernetes cluster already being up
#!/usr/bin/env bash
+# Guard against a kubernetes cluster already being up
kubectl get pods &> /dev/null
if [ $? -eq 0 ]; then
echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
fi
#
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
sudo mkdir -p /usr/local/bin
sudo mv minikube /usr/local/bin/minikube
@@ -39,15 +39,43 @@ mkdir $HOME/.kube || true
touch $HOME/.kube/config
export KUBECONFIG=$HOME/.kube/config
-sudo -E minikube start --vm-driver=none
-
-# this for loop waits until kubectl can access the api server that minikube has created
-for i in {1..150} # timeout for 5 minutes
-do
- echo "------- Running kubectl get pods -------"
- kubectl get po &> /dev/null
- if [ $? -ne 1 ]; then
- break
- fi
- sleep 2
-done
+
+start_minikube(){
+ sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
+
+ # this for loop waits until kubectl can access the api server that minikube has created
+ for i in {1..90} # timeout 3 minutes
+ do
+ echo "------- Running kubectl get pods -------"
+ STDERR=$(kubectl get pods 2>&1 >/dev/null)
+ if [ $? -ne 1 ]; then
+ echo $STDERR
+
+ # We do not need dynamic hostpath provisioning, so disable the default storageclass
+ sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
+
+ # We need to give permission to watch pods to the airflow scheduler.
+ # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
+ kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default
+ exit 0
+ fi
+ echo $STDERR
+ sleep 2
+ done
+}
+
+cleanup_minikube(){
+ sudo -E minikube stop
+ sudo -E minikube delete
+ docker stop $(docker ps -a -q) || true
+ docker rm $(docker ps -a -q) || true
+ sleep 1
+}
+
+start_minikube
+echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
+cleanup_minikube
+start_minikube
+echo "Minikube cluster creation timedout a second time. Failing."
+
+exit 1
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 1253686..8c47ee8 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
if [[ "$SKIP_TESTS" != "true" ]]; then
echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
- ./run_unit_tests.sh
+ ./run_unit_tests.sh $@
fi
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index a51e742..86c086a 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,13 +19,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
AIRFLOW_ROOT="$DIRNAME/../.."
cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
-if [ -z "$RUN_KUBE_INTEGRATION" ];
+if [ -z "$KUBERNETES_VERSION" ];
then
- $DIRNAME/kubernetes/setup_kubernetes.sh
tox -e $TOX_ENV
else
- $DIRNAME/kubernetes/setup_kubernetes.sh && \
- tox -e $TOX_ENV -- tests.contrib.executors.integration \
+ KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
+ tox -e $TOX_ENV -- tests.contrib.minikube_tests \
--with-coverage \
--cover-erase \
--cover-html \
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/minikube_tests/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..5cdd819
--- /dev/null
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+
+
+try:
+ check_call(["kubectl", "get", "pods"])
+except:
+ raise unittest.SkipTest(
+ "Kubernetes integration tests require a minikube cluster; Skipping tests"
+ )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+ def test_working_pod(self):
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ k.execute(None)
+
+ def test_faulty_image(self):
+ bad_image_name = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image=bad_image_name,
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5
+ )
+ with self.assertRaises(AirflowException) as cm:
+ k.execute(None),
+
+ print("exception: {}".format(cm))
+
+ def test_pod_failure(self):
+ """
+ Tests that the task fails when a pod reports a failure
+ """
+
+ bad_internal_command = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=[bad_internal_command, "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ with self.assertRaises(AirflowException):
+ k.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
deleted file mode 100644
index 205f183..0000000
--- a/tests/contrib/operators/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
- def test_working_pod(self):
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- k.execute(None)
-
- def test_faulty_image(self):
- bad_image_name = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image=bad_image_name,
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- startup_timeout_seconds=5
- )
- with self.assertRaises(AirflowException) as cm:
- k.execute(None),
-
- print("exception: {}".format(cm))
-
- def test_pod_failure(self):
- """
- Tests that the task fails when a pod reports a failure
- """
-
- bad_internal_command = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=[bad_internal_command, "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- with self.assertRaises(AirflowException):
- k.execute(None)
[09/16] incubator-airflow git commit: [AIRFLOW-1517] Remove
authorship of resources
Posted by bo...@apache.org.
[AIRFLOW-1517] Remove authorship of resources
Collaboration authors got destroyed when splitting up a PR, this commit removes code which will be readded in the next commit to restore authorship
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/540b724a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/540b724a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/540b724a
Branch: refs/heads/master
Commit: 540b724a0d8c81ae589a33b4de302014da4f0228
Parents: 965439b
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:38:28 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 33 ++------------------
airflow/contrib/kubernetes/pod.py | 19 -----------
2 files changed, 2 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/540b724a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index cbf3fce..0324781 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -40,10 +40,6 @@ class KubernetesRequestFactory:
def extract_image(pod, req):
req['spec']['containers'][0]['image'] = pod.image
- @staticmethod
- def extract_image_pull_policy(pod, req):
- if pod.image_pull_policy:
- req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
@staticmethod
def add_secret_to_env(env, secret):
@@ -76,9 +72,7 @@ class KubernetesRequestFactory:
if len(pod.node_selectors) > 0:
req['spec']['nodeSelector'] = pod.node_selectors
- @staticmethod
- def attach_volumes(pod, req):
- req['spec']['volumes'] = pod.volumes
+
@staticmethod
def attach_volume_mounts(pod, req):
@@ -122,30 +116,7 @@ class KubernetesRequestFactory:
KubernetesRequestFactory.add_secret_to_env(env, secret)
req['spec']['containers'][0]['env'] = env
- @staticmethod
- def extract_resources(pod, req):
- if not pod.resources or pod.resources.is_empty_resource_request():
- return
-
- req['spec']['containers'][0]['resources'] = {}
-
- if pod.resources.has_requests():
- req['spec']['containers'][0]['resources']['requests'] = {}
- if pod.resources.request_memory:
- req['spec']['containers'][0]['resources']['requests'][
- 'memory'] = pod.resources.request_memory
- if pod.resources.request_cpu:
- req['spec']['containers'][0]['resources']['requests'][
- 'cpu'] = pod.resources.request_cpu
-
- if pod.resources.has_limits():
- req['spec']['containers'][0]['resources']['limits'] = {}
- if pod.resources.request_memory:
- req['spec']['containers'][0]['resources']['limits'][
- 'memory'] = pod.resources.limit_memory
- if pod.resources.request_cpu:
- req['spec']['containers'][0]['resources']['limits'][
- 'cpu'] = pod.resources.limit_cpu
+
@staticmethod
def extract_init_containers(pod, req):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/540b724a/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index d80b626..ba6ac06 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -16,26 +16,7 @@
# under the License.
-class Resources:
- def __init__(
- self,
- request_memory=None,
- request_cpu=None,
- limit_memory=None,
- limit_cpu=None):
- self.request_memory = request_memory
- self.request_cpu = request_cpu
- self.limit_memory = limit_memory
- self.limit_cpu = limit_cpu
-
- def is_empty_resource_request(self):
- return not self.has_limits() and not self.has_requests()
-
- def has_limits(self):
- return self.limit_cpu is not None or self.limit_memory is not None
- def has_requests(self):
- return self.request_cpu is not None or self.request_memory is not None
class Pod:
[04/16] incubator-airflow git commit: [AIRFLOW-1517] Add minikube for
kubernetes integration tests
Posted by bo...@apache.org.
[AIRFLOW-1517] Add minikube for kubernetes integration tests
Add better support for minikube integration tests; By default minikube integration tests will run with kubernetes 1.7 and kubernetes 1.8
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cde3a5fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cde3a5fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cde3a5fe
Branch: refs/heads/master
Commit: cde3a5fecd825c90847bc563255707caafdd9336
Parents: 361dad9
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Wed Dec 27 14:21:20 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:28:32 2018 -0800
----------------------------------------------------------------------
.travis.yml | 9 +++
airflow/contrib/kubernetes/kube_client.py | 8 +-
.../ci/kubernetes/minikube/start_minikube.sh | 58 +++++++++++----
scripts/ci/run_tests.sh | 2 +-
scripts/ci/travis_script.sh | 7 +-
tests/contrib/minikube_tests/__init__.py | 13 ++++
.../test_kubernetes_pod_operator.py | 78 ++++++++++++++++++++
.../operators/test_kubernetes_pod_operator.py | 69 -----------------
8 files changed, 150 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6b45153..dec9181 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,6 +54,8 @@ env:
- TOX_ENV=py35-backend_sqlite
- TOX_ENV=py35-backend_postgres
- TOX_ENV=flake8
+ - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+ - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
matrix:
exclude:
- python: "3.5"
@@ -70,6 +72,13 @@ matrix:
env: TOX_ENV=py35-backend_postgres
- python: "2.7"
env: TOX_ENV=flake8
+ - python: "3.5"
+ env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+ - python: "3.5"
+ env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+ allow_failures:
+ - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+ - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
cache:
directories:
- $HOME/.wheelhouse/
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 7dc895e..9db8641 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -13,10 +13,11 @@
# limitations under the License.
-def load_kube_config(in_cluster=True):
+def _load_kube_config(in_cluster):
from kubernetes import config, client
if in_cluster:
config.load_incluster_config()
+ return client.CoreV1Api()
else:
try:
config.load_kube_config()
@@ -28,7 +29,4 @@ def load_kube_config(in_cluster=True):
def get_kube_client(in_cluster=True):
# TODO: This should also allow people to point to a cluster.
-
- from kubernetes import client
- load_kube_config(in_cluster)
- return client.CoreV1Api()
+ return _load_kube_config(in_cluster)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index f78cb3a..1da23d0 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations *
# under the License. *
-# Guard against a kubernetes cluster already being up
#!/usr/bin/env bash
+# Guard against a kubernetes cluster already being up
kubectl get pods &> /dev/null
if [ $? -eq 0 ]; then
echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
fi
#
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
sudo mkdir -p /usr/local/bin
sudo mv minikube /usr/local/bin/minikube
@@ -39,15 +39,43 @@ mkdir $HOME/.kube || true
touch $HOME/.kube/config
export KUBECONFIG=$HOME/.kube/config
-sudo -E minikube start --vm-driver=none
-
-# this for loop waits until kubectl can access the api server that minikube has created
-for i in {1..150} # timeout for 5 minutes
-do
- echo "------- Running kubectl get pods -------"
- kubectl get po &> /dev/null
- if [ $? -ne 1 ]; then
- break
- fi
- sleep 2
-done
+
+start_minikube(){
+ sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
+
+ # this for loop waits until kubectl can access the api server that minikube has created
+ for i in {1..90} # timeout 3 minutes
+ do
+ echo "------- Running kubectl get pods -------"
+ STDERR=$(kubectl get pods 2>&1 >/dev/null)
+ if [ $? -ne 1 ]; then
+ echo $STDERR
+
+ # We do not need dynamic hostpath provisioning, so disable the default storageclass
+ sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
+
+ # We need to give permission to watch pods to the airflow scheduler.
+ # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
+ kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default
+ exit 0
+ fi
+ echo $STDERR
+ sleep 2
+ done
+}
+
+cleanup_minikube(){
+ sudo -E minikube stop
+ sudo -E minikube delete
+ docker stop $(docker ps -a -q) || true
+ docker rm $(docker ps -a -q) || true
+ sleep 1
+}
+
+start_minikube
+echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
+cleanup_minikube
+start_minikube
+echo "Minikube cluster creation timedout a second time. Failing."
+
+exit 1
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 1253686..8c47ee8 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
if [[ "$SKIP_TESTS" != "true" ]]; then
echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
- ./run_unit_tests.sh
+ ./run_unit_tests.sh $@
fi
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index a51e742..86c086a 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,13 +19,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
AIRFLOW_ROOT="$DIRNAME/../.."
cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
-if [ -z "$RUN_KUBE_INTEGRATION" ];
+if [ -z "$KUBERNETES_VERSION" ];
then
- $DIRNAME/kubernetes/setup_kubernetes.sh
tox -e $TOX_ENV
else
- $DIRNAME/kubernetes/setup_kubernetes.sh && \
- tox -e $TOX_ENV -- tests.contrib.executors.integration \
+ KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
+ tox -e $TOX_ENV -- tests.contrib.minikube_tests \
--with-coverage \
--cover-erase \
--cover-html \
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/minikube_tests/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..18e614b
--- /dev/null
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call, CalledProcessError
+
+
+try:
+ check_call(["kubectl", "get", "pods"])
+except CalledProcessError:
+ raise unittest.SkipTest(
+ "Kubernetes integration tests require a minikube cluster; Skipping tests"
+ )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+ def test_working_pod(self):
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ k.execute(None)
+
+ def test_faulty_image(self):
+ bad_image_name = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image=bad_image_name,
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5
+ )
+ with self.assertRaises(AirflowException) as cm:
+ k.execute(None),
+
+ print("exception: {}".format(cm))
+
+ def test_pod_failure(self):
+ """
+ Tests that the task fails when a pod reports a failure
+ """
+
+ bad_internal_command = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=[bad_internal_command, "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ with self.assertRaises(AirflowException):
+ k.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
deleted file mode 100644
index 205f183..0000000
--- a/tests/contrib/operators/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
- def test_working_pod(self):
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- k.execute(None)
-
- def test_faulty_image(self):
- bad_image_name = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image=bad_image_name,
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- startup_timeout_seconds=5
- )
- with self.assertRaises(AirflowException) as cm:
- k.execute(None),
-
- print("exception: {}".format(cm))
-
- def test_pod_failure(self):
- """
- Tests that the task fails when a pod reports a failure
- """
-
- bad_internal_command = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=[bad_internal_command, "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- with self.assertRaises(AirflowException):
- k.execute(None)
[03/16] incubator-airflow git commit: [AIRFLOW-1517] Restore
authorship of secrets and init container
Posted by bo...@apache.org.
[AIRFLOW-1517] Restore authorship of secrets and init container
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/361dad95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/361dad95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/361dad95
Branch: refs/heads/master
Commit: 361dad957cca17eb37964c5945446bf62d74592f
Parents: c5ced07
Author: Benjamin Goldberg <be...@spothero.com>
Authored: Wed Dec 27 08:30:04 2017 -0600
Committer: Benjamin Goldberg <be...@spothero.com>
Committed: Wed Dec 27 08:44:39 2017 -0600
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 48 +++++++++
.../pod_request_factory.py | 4 +
airflow/contrib/kubernetes/pod.py | 3 +
airflow/contrib/kubernetes/pod_generator.py | 105 +++++++++++++++++++
airflow/contrib/kubernetes/secret.py | 36 +++++++
5 files changed, 196 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 88d3f32..9398bef 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -89,6 +89,37 @@ class KubernetesRequestFactory:
req['metadata']['name'] = pod.name
@staticmethod
+ def extract_volume_secrets(pod, req):
+ vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+ if any(vol_secrets):
+ req['spec']['containers'][0]['volumeMounts'] = []
+ req['spec']['volumes'] = []
+ for idx, vol in enumerate(vol_secrets):
+ vol_id = 'secretvol' + str(idx)
+ req['spec']['containers'][0]['volumeMounts'].append({
+ 'mountPath': vol.deploy_target,
+ 'name': vol_id,
+ 'readOnly': True
+ })
+ req['spec']['volumes'].append({
+ 'name': vol_id,
+ 'secret': {
+ 'secretName': vol.secret
+ }
+ })
+
+ @staticmethod
+ def extract_env_and_secrets(pod, req):
+ env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+ if len(pod.envs) > 0 or len(env_secrets) > 0:
+ env = []
+ for k in pod.envs.keys():
+ env.append({'name': k, 'value': pod.envs[k]})
+ for secret in env_secrets:
+ KubernetesRequestFactory.add_secret_to_env(env, secret)
+ req['spec']['containers'][0]['env'] = env
+
+ @staticmethod
def extract_resources(pod, req):
if not pod.resources or pod.resources.is_empty_resource_request():
return
@@ -112,3 +143,20 @@ class KubernetesRequestFactory:
if pod.resources.request_cpu:
req['spec']['containers'][0]['resources']['limits'][
'cpu'] = pod.resources.limit_cpu
+
+ @staticmethod
+ def extract_init_containers(pod, req):
+ if pod.init_containers:
+ req['spec']['initContainers'] = pod.init_containers
+
+ @staticmethod
+ def extract_service_account_name(pod, req):
+ if pod.service_account_name:
+ req['spec']['serviceAccountName'] = pod.service_account_name
+
+ @staticmethod
+ def extract_image_pull_secrets(pod, req):
+ if pod.image_pull_secrets:
+ req['spec']['imagePullSecrets'] = [{
+ 'name': pull_secret
+ } for pull_secret in pod.image_pull_secrets.split(',')]
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index ac9ded1..3be1a13 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -45,8 +45,12 @@ spec:
self.extract_cmds(pod, req)
self.extract_args(pod, req)
self.extract_node_selector(pod, req)
+ self.extract_env_and_secrets(pod, req)
self.extract_volume_secrets(pod, req)
self.attach_volumes(pod, req)
self.attach_volume_mounts(pod, req)
self.extract_resources(pod, req)
+ self.extract_service_account_name(pod, req)
+ self.extract_init_containers(pod, req)
+ self.extract_image_pull_secrets(pod, req)
return req
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index f6a0583..6a9f76d 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -86,4 +86,7 @@ class Pod:
self.node_selectors = node_selectors or []
self.namespace = namespace
self.image_pull_policy = image_pull_policy
+ self.image_pull_secrets = image_pull_secrets
+ self.init_containers = init_containers
+ self.service_account_name = service_account_name
self.resources = resources or Resources()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 69c6fcb..685be37 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -29,6 +29,39 @@ class PodGenerator:
self.init_containers = []
self.secrets = []
+ def add_init_container(self,
+ name,
+ image,
+ securityContext,
+ init_environment,
+ volume_mounts
+ ):
+ """
+
+ Adds an init container to the launched pod. useful for pre-
+
+ Args:
+ name (str):
+ image (str):
+ securityContext (dict):
+ init_environment (dict):
+ volume_mounts (dict):
+
+ Returns:
+
+ """
+ self.init_containers.append(
+ {
+ 'name': name,
+ 'image': image,
+ 'securityContext': securityContext,
+ 'env': init_environment,
+ 'volumeMounts': volume_mounts
+ }
+ )
+
+ def _get_init_containers(self):
+ return self.init_containers
def add_volume(self, name):
"""
@@ -76,6 +109,12 @@ class PodGenerator:
def _get_volumes_and_mounts(self):
return self.volumes, self.volume_mounts
+ def _get_image_pull_secrets(self):
+ """Extracts any image pull secrets for fetching container(s)"""
+ if not self.kube_config.image_pull_secrets:
+ return []
+ return self.kube_config.image_pull_secrets.split(',')
+
def make_pod(self, namespace, image, pod_id, cmds,
arguments, labels, kube_executor_config=None):
volumes, volume_mounts = self._get_volumes_and_mounts()
@@ -97,6 +136,9 @@ class PodGenerator:
labels=labels,
envs=self.env_vars,
secrets={},
+ # service_account_name=self.kube_config.worker_service_account_name,
+ # image_pull_secrets=self.kube_config.image_pull_secrets,
+ init_containers=worker_init_container_spec,
volumes=volumes,
volume_mounts=volume_mounts,
resources=None
@@ -128,12 +170,32 @@ class WorkerGenerator(PodGenerator):
'readOnly': True
}]
+ # Mount the airflow.cfg file via a configmap the user has specified
+ if self.kube_config.airflow_configmap:
+ config_volume_name = "airflow-config"
+ config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+ volumes.append({
+ 'name': config_volume_name,
+ 'configMap': {
+ 'name': self.kube_config.airflow_configmap
+ }
+ })
+ volume_mounts.append({
+ 'name': config_volume_name,
+ 'mountPath': config_path,
+ 'subPath': 'airflow.cfg',
+ 'readOnly': True
+ })
+
# A PV with the DAGs should be mounted
if self.kube_config.dags_volume_claim:
volumes[0]['persistentVolumeClaim'] = {
"claimName": self.kube_config.dags_volume_claim}
if self.kube_config.dags_volume_subpath:
volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
+ else:
+ # Create a Shared Volume for the Git-Sync module to populate
+ volumes[0]["emptyDir"] = {}
return volumes, volume_mounts
def _init_labels(self, dag_id, task_id, execution_date):
@@ -154,6 +216,49 @@ class WorkerGenerator(PodGenerator):
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
return env
+ def _init_init_containers(self, volume_mounts):
+ """When using git to retrieve the DAGs, use the GitSync Init Container"""
+ # If we're using volume claims to mount the dags, no init container is needed
+ if self.kube_config.dags_volume_claim:
+ return []
+
+ # Otherwise, define a git-sync init container
+ init_environment = [{
+ 'name': 'GIT_SYNC_REPO',
+ 'value': self.kube_config.git_repo
+ }, {
+ 'name': 'GIT_SYNC_BRANCH',
+ 'value': self.kube_config.git_branch
+ }, {
+ 'name': 'GIT_SYNC_ROOT',
+ 'value': '/tmp'
+ }, {
+ 'name': 'GIT_SYNC_DEST',
+ 'value': 'dags'
+ }, {
+ 'name': 'GIT_SYNC_ONE_TIME',
+ 'value': 'true'
+ }]
+ if self.kube_config.git_user:
+ init_environment.append({
+ 'name': 'GIT_SYNC_USERNAME',
+ 'value': self.kube_config.git_user
+ })
+ if self.kube_config.git_password:
+ init_environment.append({
+ 'name': 'GIT_SYNC_PASSWORD',
+ 'value': self.kube_config.git_password
+ })
+
+ volume_mounts[0]['readOnly'] = False
+ return [{
+ 'name': self.kube_config.git_sync_init_container_name,
+ 'image': self.kube_config.git_sync_container,
+ 'securityContext': {'runAsUser': 0},
+ 'env': init_environment,
+ 'volumeMounts': volume_mounts
+ }]
+
def make_worker_pod(self,
namespace,
pod_id,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index e69de29..15f070e 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Secret:
+ """Defines Kubernetes Secret Volume"""
+
+ def __init__(self, deploy_type, deploy_target, secret, key):
+ """Initialize a Kubernetes Secret Object. Used to track requested secrets from
+ the user.
+
+ :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
+ `volume`
+ :type deploy_type: ``str``
+ :param deploy_target: The environment variable to be created in the worker.
+ :type deploy_target: ``str``
+ :param secret: Name of the secrets object in Kubernetes
+ :type secret: ``str``
+ :param key: Key of the secret within the Kubernetes Secret
+ :type key: ``str``
+ """
+ self.deploy_type = deploy_type
+ self.deploy_target = deploy_target.upper()
+ self.secret = secret
+ self.key = key
[13/16] incubator-airflow git commit: [AIRFLOW-1517] Restore
authorship of resources
Posted by bo...@apache.org.
[AIRFLOW-1517] Restore authorship of resources
Collaboration authors got destroyed when splitting up a PR, this commit adds back in the code which was be removed in the previous commit to restore authorship
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/28d9d7f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/28d9d7f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/28d9d7f0
Branch: refs/heads/master
Commit: 28d9d7f00f3789e02702db969c1b19fdc16ef968
Parents: 540b724
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:39:48 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:17 2018 -0800
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 33 ++++++++++++++++++--
airflow/contrib/kubernetes/pod.py | 19 +++++++++++
2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28d9d7f0/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 0324781..cbf3fce 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -40,6 +40,10 @@ class KubernetesRequestFactory:
def extract_image(pod, req):
req['spec']['containers'][0]['image'] = pod.image
+ @staticmethod
+ def extract_image_pull_policy(pod, req):
+ if pod.image_pull_policy:
+ req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
@staticmethod
def add_secret_to_env(env, secret):
@@ -72,7 +76,9 @@ class KubernetesRequestFactory:
if len(pod.node_selectors) > 0:
req['spec']['nodeSelector'] = pod.node_selectors
-
+ @staticmethod
+ def attach_volumes(pod, req):
+ req['spec']['volumes'] = pod.volumes
@staticmethod
def attach_volume_mounts(pod, req):
@@ -116,7 +122,30 @@ class KubernetesRequestFactory:
KubernetesRequestFactory.add_secret_to_env(env, secret)
req['spec']['containers'][0]['env'] = env
-
+ @staticmethod
+ def extract_resources(pod, req):
+ if not pod.resources or pod.resources.is_empty_resource_request():
+ return
+
+ req['spec']['containers'][0]['resources'] = {}
+
+ if pod.resources.has_requests():
+ req['spec']['containers'][0]['resources']['requests'] = {}
+ if pod.resources.request_memory:
+ req['spec']['containers'][0]['resources']['requests'][
+ 'memory'] = pod.resources.request_memory
+ if pod.resources.request_cpu:
+ req['spec']['containers'][0]['resources']['requests'][
+ 'cpu'] = pod.resources.request_cpu
+
+ if pod.resources.has_limits():
+ req['spec']['containers'][0]['resources']['limits'] = {}
+ if pod.resources.request_memory:
+ req['spec']['containers'][0]['resources']['limits'][
+ 'memory'] = pod.resources.limit_memory
+ if pod.resources.request_cpu:
+ req['spec']['containers'][0]['resources']['limits'][
+ 'cpu'] = pod.resources.limit_cpu
@staticmethod
def extract_init_containers(pod, req):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28d9d7f0/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index ba6ac06..d80b626 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -16,7 +16,26 @@
# under the License.
+class Resources:
+ def __init__(
+ self,
+ request_memory=None,
+ request_cpu=None,
+ limit_memory=None,
+ limit_cpu=None):
+ self.request_memory = request_memory
+ self.request_cpu = request_cpu
+ self.limit_memory = limit_memory
+ self.limit_cpu = limit_cpu
+
+ def is_empty_resource_request(self):
+ return not self.has_limits() and not self.has_requests()
+
+ def has_limits(self):
+ return self.limit_cpu is not None or self.limit_memory is not None
+ def has_requests(self):
+ return self.request_cpu is not None or self.request_memory is not None
class Pod:
[10/16] incubator-airflow git commit: Revert "[AIRFLOW-1517] Add
minikube for kubernetes integration tests"
Posted by bo...@apache.org.
Revert "[AIRFLOW-1517] Add minikube for kubernetes integration tests"
This reverts commit 0197931609685a98181387014f7c8f3b5cd5f9a8.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a42dbb4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a42dbb4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a42dbb4f
Branch: refs/heads/master
Commit: a42dbb4f4d95abcbf91b50a9c408db6fa315daed
Parents: 7c9e3c1
Author: Daniel Imberman <da...@gmail.com>
Authored: Fri Dec 29 13:23:27 2017 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800
----------------------------------------------------------------------
.travis.yml | 9 ---
airflow/contrib/kubernetes/kube_client.py | 8 +-
.../ci/kubernetes/minikube/start_minikube.sh | 58 ++++-----------
scripts/ci/run_tests.sh | 2 +-
scripts/ci/travis_script.sh | 7 +-
tests/contrib/minikube_tests/__init__.py | 13 ----
.../test_kubernetes_pod_operator.py | 78 --------------------
.../operators/test_kubernetes_pod_operator.py | 69 +++++++++++++++++
8 files changed, 94 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index dec9181..6b45153 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,8 +54,6 @@ env:
- TOX_ENV=py35-backend_sqlite
- TOX_ENV=py35-backend_postgres
- TOX_ENV=flake8
- - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
- - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
matrix:
exclude:
- python: "3.5"
@@ -72,13 +70,6 @@ matrix:
env: TOX_ENV=py35-backend_postgres
- python: "2.7"
env: TOX_ENV=flake8
- - python: "3.5"
- env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
- - python: "3.5"
- env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- allow_failures:
- - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
- - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
cache:
directories:
- $HOME/.wheelhouse/
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index ecb3d55..cd68caf 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -16,15 +16,17 @@
# under the License.
-def _load_kube_config(in_cluster):
+def load_kube_config(in_cluster=True):
from kubernetes import config, client
if in_cluster:
config.load_incluster_config()
- return client.CoreV1Api()
else:
config.load_kube_config()
return client.CoreV1Api()
def get_kube_client(in_cluster=True):
# TODO: This should also allow people to point to a cluster.
- return _load_kube_config(in_cluster)
+
+ from kubernetes import client
+ load_kube_config(in_cluster)
+ return client.CoreV1Api()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 1da23d0..f78cb3a 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations *
# under the License. *
-#!/usr/bin/env bash
# Guard against a kubernetes cluster already being up
+#!/usr/bin/env bash
kubectl get pods &> /dev/null
if [ $? -eq 0 ]; then
echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
fi
#
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
sudo mkdir -p /usr/local/bin
sudo mv minikube /usr/local/bin/minikube
@@ -39,43 +39,15 @@ mkdir $HOME/.kube || true
touch $HOME/.kube/config
export KUBECONFIG=$HOME/.kube/config
-
-start_minikube(){
- sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
-
- # this for loop waits until kubectl can access the api server that minikube has created
- for i in {1..90} # timeout 3 minutes
- do
- echo "------- Running kubectl get pods -------"
- STDERR=$(kubectl get pods 2>&1 >/dev/null)
- if [ $? -ne 1 ]; then
- echo $STDERR
-
- # We do not need dynamic hostpath provisioning, so disable the default storageclass
- sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
-
- # We need to give permission to watch pods to the airflow scheduler.
- # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
- kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default
- exit 0
- fi
- echo $STDERR
- sleep 2
- done
-}
-
-cleanup_minikube(){
- sudo -E minikube stop
- sudo -E minikube delete
- docker stop $(docker ps -a -q) || true
- docker rm $(docker ps -a -q) || true
- sleep 1
-}
-
-start_minikube
-echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
-cleanup_minikube
-start_minikube
-echo "Minikube cluster creation timedout a second time. Failing."
-
-exit 1
+sudo -E minikube start --vm-driver=none
+
+# this for loop waits until kubectl can access the api server that minikube has created
+for i in {1..150} # timeout for 5 minutes
+do
+ echo "------- Running kubectl get pods -------"
+ kubectl get po &> /dev/null
+ if [ $? -ne 1 ]; then
+ break
+ fi
+ sleep 2
+done
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 8c47ee8..1253686 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
if [[ "$SKIP_TESTS" != "true" ]]; then
echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
- ./run_unit_tests.sh $@
+ ./run_unit_tests.sh
fi
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 86c086a..a51e742 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,12 +19,13 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
AIRFLOW_ROOT="$DIRNAME/../.."
cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
-if [ -z "$KUBERNETES_VERSION" ];
+if [ -z "$RUN_KUBE_INTEGRATION" ];
then
+ $DIRNAME/kubernetes/setup_kubernetes.sh
tox -e $TOX_ENV
else
- KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
- tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+ $DIRNAME/kubernetes/setup_kubernetes.sh && \
+ tox -e $TOX_ENV -- tests.contrib.executors.integration \
--with-coverage \
--cover-erase \
--cover-html \
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/minikube_tests/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
deleted file mode 100644
index 18e614b..0000000
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-from subprocess import check_call, CalledProcessError
-
-
-try:
- check_call(["kubectl", "get", "pods"])
-except CalledProcessError:
- raise unittest.SkipTest(
- "Kubernetes integration tests require a minikube cluster; Skipping tests"
- )
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
- def test_working_pod(self):
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- k.execute(None)
-
- def test_faulty_image(self):
- bad_image_name = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image=bad_image_name,
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- startup_timeout_seconds=5
- )
- with self.assertRaises(AirflowException) as cm:
- k.execute(None),
-
- print("exception: {}".format(cm))
-
- def test_pod_failure(self):
- """
- Tests that the task fails when a pod reports a failure
- """
-
- bad_internal_command = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=[bad_internal_command, "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- with self.assertRaises(AirflowException):
- k.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..205f183
--- /dev/null
+++ b/tests/contrib/operators/test_kubernetes_pod_operator.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+ def test_working_pod(self):
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ k.execute(None)
+
+ def test_faulty_image(self):
+ bad_image_name = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image=bad_image_name,
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5
+ )
+ with self.assertRaises(AirflowException) as cm:
+ k.execute(None),
+
+ print("exception: {}".format(cm))
+
+ def test_pod_failure(self):
+ """
+ Tests that the task fails when a pod reports a failure
+ """
+
+ bad_internal_command = "foobar"
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=[bad_internal_command, "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+
+ with self.assertRaises(AirflowException):
+ k.execute(None)
[02/16] incubator-airflow git commit: [AIRFLOW-1517] Remove
authorship of secrets and init container
Posted by bo...@apache.org.
[AIRFLOW-1517] Remove authorship of secrets and init container
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5ced072
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5ced072
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5ced072
Branch: refs/heads/master
Commit: c5ced072b2199b3506abb4e4680e828f91066aaf
Parents: 78ff2fc
Author: Benjamin Goldberg <be...@spothero.com>
Authored: Wed Dec 27 08:27:48 2017 -0600
Committer: Benjamin Goldberg <be...@spothero.com>
Committed: Wed Dec 27 08:28:58 2017 -0600
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 48 ---------
.../pod_request_factory.py | 4 -
airflow/contrib/kubernetes/pod.py | 3 -
airflow/contrib/kubernetes/pod_generator.py | 105 -------------------
airflow/contrib/kubernetes/secret.py | 36 -------
5 files changed, 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 9398bef..88d3f32 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -89,37 +89,6 @@ class KubernetesRequestFactory:
req['metadata']['name'] = pod.name
@staticmethod
- def extract_volume_secrets(pod, req):
- vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
- if any(vol_secrets):
- req['spec']['containers'][0]['volumeMounts'] = []
- req['spec']['volumes'] = []
- for idx, vol in enumerate(vol_secrets):
- vol_id = 'secretvol' + str(idx)
- req['spec']['containers'][0]['volumeMounts'].append({
- 'mountPath': vol.deploy_target,
- 'name': vol_id,
- 'readOnly': True
- })
- req['spec']['volumes'].append({
- 'name': vol_id,
- 'secret': {
- 'secretName': vol.secret
- }
- })
-
- @staticmethod
- def extract_env_and_secrets(pod, req):
- env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
- if len(pod.envs) > 0 or len(env_secrets) > 0:
- env = []
- for k in pod.envs.keys():
- env.append({'name': k, 'value': pod.envs[k]})
- for secret in env_secrets:
- KubernetesRequestFactory.add_secret_to_env(env, secret)
- req['spec']['containers'][0]['env'] = env
-
- @staticmethod
def extract_resources(pod, req):
if not pod.resources or pod.resources.is_empty_resource_request():
return
@@ -143,20 +112,3 @@ class KubernetesRequestFactory:
if pod.resources.request_cpu:
req['spec']['containers'][0]['resources']['limits'][
'cpu'] = pod.resources.limit_cpu
-
- @staticmethod
- def extract_init_containers(pod, req):
- if pod.init_containers:
- req['spec']['initContainers'] = pod.init_containers
-
- @staticmethod
- def extract_service_account_name(pod, req):
- if pod.service_account_name:
- req['spec']['serviceAccountName'] = pod.service_account_name
-
- @staticmethod
- def extract_image_pull_secrets(pod, req):
- if pod.image_pull_secrets:
- req['spec']['imagePullSecrets'] = [{
- 'name': pull_secret
- } for pull_secret in pod.image_pull_secrets.split(',')]
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 3be1a13..ac9ded1 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -45,12 +45,8 @@ spec:
self.extract_cmds(pod, req)
self.extract_args(pod, req)
self.extract_node_selector(pod, req)
- self.extract_env_and_secrets(pod, req)
self.extract_volume_secrets(pod, req)
self.attach_volumes(pod, req)
self.attach_volume_mounts(pod, req)
self.extract_resources(pod, req)
- self.extract_service_account_name(pod, req)
- self.extract_init_containers(pod, req)
- self.extract_image_pull_secrets(pod, req)
return req
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 6a9f76d..f6a0583 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -86,7 +86,4 @@ class Pod:
self.node_selectors = node_selectors or []
self.namespace = namespace
self.image_pull_policy = image_pull_policy
- self.image_pull_secrets = image_pull_secrets
- self.init_containers = init_containers
- self.service_account_name = service_account_name
self.resources = resources or Resources()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 685be37..69c6fcb 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -29,39 +29,6 @@ class PodGenerator:
self.init_containers = []
self.secrets = []
- def add_init_container(self,
- name,
- image,
- securityContext,
- init_environment,
- volume_mounts
- ):
- """
-
- Adds an init container to the launched pod. useful for pre-
-
- Args:
- name (str):
- image (str):
- securityContext (dict):
- init_environment (dict):
- volume_mounts (dict):
-
- Returns:
-
- """
- self.init_containers.append(
- {
- 'name': name,
- 'image': image,
- 'securityContext': securityContext,
- 'env': init_environment,
- 'volumeMounts': volume_mounts
- }
- )
-
- def _get_init_containers(self):
- return self.init_containers
def add_volume(self, name):
"""
@@ -109,12 +76,6 @@ class PodGenerator:
def _get_volumes_and_mounts(self):
return self.volumes, self.volume_mounts
- def _get_image_pull_secrets(self):
- """Extracts any image pull secrets for fetching container(s)"""
- if not self.kube_config.image_pull_secrets:
- return []
- return self.kube_config.image_pull_secrets.split(',')
-
def make_pod(self, namespace, image, pod_id, cmds,
arguments, labels, kube_executor_config=None):
volumes, volume_mounts = self._get_volumes_and_mounts()
@@ -136,9 +97,6 @@ class PodGenerator:
labels=labels,
envs=self.env_vars,
secrets={},
- # service_account_name=self.kube_config.worker_service_account_name,
- # image_pull_secrets=self.kube_config.image_pull_secrets,
- init_containers=worker_init_container_spec,
volumes=volumes,
volume_mounts=volume_mounts,
resources=None
@@ -170,32 +128,12 @@ class WorkerGenerator(PodGenerator):
'readOnly': True
}]
- # Mount the airflow.cfg file via a configmap the user has specified
- if self.kube_config.airflow_configmap:
- config_volume_name = "airflow-config"
- config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
- volumes.append({
- 'name': config_volume_name,
- 'configMap': {
- 'name': self.kube_config.airflow_configmap
- }
- })
- volume_mounts.append({
- 'name': config_volume_name,
- 'mountPath': config_path,
- 'subPath': 'airflow.cfg',
- 'readOnly': True
- })
-
# A PV with the DAGs should be mounted
if self.kube_config.dags_volume_claim:
volumes[0]['persistentVolumeClaim'] = {
"claimName": self.kube_config.dags_volume_claim}
if self.kube_config.dags_volume_subpath:
volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
- else:
- # Create a Shared Volume for the Git-Sync module to populate
- volumes[0]["emptyDir"] = {}
return volumes, volume_mounts
def _init_labels(self, dag_id, task_id, execution_date):
@@ -216,49 +154,6 @@ class WorkerGenerator(PodGenerator):
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
return env
- def _init_init_containers(self, volume_mounts):
- """When using git to retrieve the DAGs, use the GitSync Init Container"""
- # If we're using volume claims to mount the dags, no init container is needed
- if self.kube_config.dags_volume_claim:
- return []
-
- # Otherwise, define a git-sync init container
- init_environment = [{
- 'name': 'GIT_SYNC_REPO',
- 'value': self.kube_config.git_repo
- }, {
- 'name': 'GIT_SYNC_BRANCH',
- 'value': self.kube_config.git_branch
- }, {
- 'name': 'GIT_SYNC_ROOT',
- 'value': '/tmp'
- }, {
- 'name': 'GIT_SYNC_DEST',
- 'value': 'dags'
- }, {
- 'name': 'GIT_SYNC_ONE_TIME',
- 'value': 'true'
- }]
- if self.kube_config.git_user:
- init_environment.append({
- 'name': 'GIT_SYNC_USERNAME',
- 'value': self.kube_config.git_user
- })
- if self.kube_config.git_password:
- init_environment.append({
- 'name': 'GIT_SYNC_PASSWORD',
- 'value': self.kube_config.git_password
- })
-
- volume_mounts[0]['readOnly'] = False
- return [{
- 'name': self.kube_config.git_sync_init_container_name,
- 'image': self.kube_config.git_sync_container,
- 'securityContext': {'runAsUser': 0},
- 'env': init_environment,
- 'volumeMounts': volume_mounts
- }]
-
def make_worker_pod(self,
namespace,
pod_id,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index 15f070e..e69de29 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -1,36 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class Secret:
- """Defines Kubernetes Secret Volume"""
-
- def __init__(self, deploy_type, deploy_target, secret, key):
- """Initialize a Kubernetes Secret Object. Used to track requested secrets from
- the user.
-
- :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
- `volume`
- :type deploy_type: ``str``
- :param deploy_target: The environment variable to be created in the worker.
- :type deploy_target: ``str``
- :param secret: Name of the secrets object in Kubernetes
- :type secret: ``str``
- :param key: Key of the secret within the Kubernetes Secret
- :type key: ``str``
- """
- self.deploy_type = deploy_type
- self.deploy_target = deploy_target.upper()
- self.secret = secret
- self.key = key
[06/16] incubator-airflow git commit: [AIRFLOW-1517] Remove
authorship of resources
Posted by bo...@apache.org.
[AIRFLOW-1517] Remove authorship of resources
Collaboration authors got destroyed when splitting up a PR, this commit removes code which will be readded in the next commit to restore authorship
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ada7aed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ada7aed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ada7aed0
Branch: refs/heads/master
Commit: ada7aed0da1e84e2fc7b217295d6160a6764f787
Parents: cde3a5f
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:38:28 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:28:33 2018 -0800
----------------------------------------------------------------------
.../kubernetes_request_factory.py | 33 ++------------------
airflow/contrib/kubernetes/pod.py | 19 -----------
2 files changed, 2 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ada7aed0/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 9398bef..7d698b4 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -37,10 +37,6 @@ class KubernetesRequestFactory:
def extract_image(pod, req):
req['spec']['containers'][0]['image'] = pod.image
- @staticmethod
- def extract_image_pull_policy(pod, req):
- if pod.image_pull_policy:
- req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
@staticmethod
def add_secret_to_env(env, secret):
@@ -73,9 +69,7 @@ class KubernetesRequestFactory:
if len(pod.node_selectors) > 0:
req['spec']['nodeSelector'] = pod.node_selectors
- @staticmethod
- def attach_volumes(pod, req):
- req['spec']['volumes'] = pod.volumes
+
@staticmethod
def attach_volume_mounts(pod, req):
@@ -119,30 +113,7 @@ class KubernetesRequestFactory:
KubernetesRequestFactory.add_secret_to_env(env, secret)
req['spec']['containers'][0]['env'] = env
- @staticmethod
- def extract_resources(pod, req):
- if not pod.resources or pod.resources.is_empty_resource_request():
- return
-
- req['spec']['containers'][0]['resources'] = {}
-
- if pod.resources.has_requests():
- req['spec']['containers'][0]['resources']['requests'] = {}
- if pod.resources.request_memory:
- req['spec']['containers'][0]['resources']['requests'][
- 'memory'] = pod.resources.request_memory
- if pod.resources.request_cpu:
- req['spec']['containers'][0]['resources']['requests'][
- 'cpu'] = pod.resources.request_cpu
-
- if pod.resources.has_limits():
- req['spec']['containers'][0]['resources']['limits'] = {}
- if pod.resources.request_memory:
- req['spec']['containers'][0]['resources']['limits'][
- 'memory'] = pod.resources.limit_memory
- if pod.resources.request_cpu:
- req['spec']['containers'][0]['resources']['limits'][
- 'cpu'] = pod.resources.limit_cpu
+
@staticmethod
def extract_init_containers(pod, req):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ada7aed0/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 6a9f76d..cdb1d65 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -14,26 +14,7 @@
# limitations under the License.
-class Resources:
- def __init__(
- self,
- request_memory=None,
- request_cpu=None,
- limit_memory=None,
- limit_cpu=None):
- self.request_memory = request_memory
- self.request_cpu = request_cpu
- self.limit_memory = limit_memory
- self.limit_cpu = limit_cpu
-
- def is_empty_resource_request(self):
- return not self.has_limits() and not self.has_requests()
-
- def has_limits(self):
- return self.limit_cpu is not None or self.limit_memory is not None
- def has_requests(self):
- return self.request_cpu is not None or self.request_memory is not None
class Pod: