You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/12 18:03:01 UTC

[01/16] incubator-airflow git commit: [AIRFLOW-1517] Kubernetes Operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master b48bbbd6f -> 1abe7f6d5


[AIRFLOW-1517] Kubernetes Operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/78ff2fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/78ff2fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/78ff2fc1

Branch: refs/heads/master
Commit: 78ff2fc180808a38c53cc643bd87c509d7540b4a
Parents: c0dffb5
Author: Daniel Imberman <da...@gmail.com>
Authored: Thu Dec 7 09:41:05 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Tue Dec 26 08:45:31 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   5 +-
 airflow/contrib/kubernetes/__init__.py          |  13 +
 airflow/contrib/kubernetes/kube_client.py       |  34 +++
 .../kubernetes_request_factory/__init__.py      |  12 +
 .../kubernetes_request_factory.py               | 162 +++++++++++
 .../pod_request_factory.py                      |  56 ++++
 airflow/contrib/kubernetes/pod.py               |  92 ++++++
 airflow/contrib/kubernetes/pod_generator.py     | 278 +++++++++++++++++++
 airflow/contrib/kubernetes/pod_launcher.py      | 119 ++++++++
 airflow/contrib/kubernetes/secret.py            |  36 +++
 .../operators/kubernetes_pod_operator.py        |  71 +++++
 .../ci/kubernetes/minikube/start_minikube.sh    |  53 ++++
 scripts/ci/kubernetes/setup_kubernetes.sh       |  28 ++
 scripts/ci/requirements.txt                     |   1 +
 scripts/ci/travis_script.sh                     |  39 +++
 setup.py                                        |   6 +-
 .../operators/test_kubernetes_pod_operator.py   |  69 +++++
 17 files changed, 1069 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 9b173a9..6b45153 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -89,9 +89,6 @@ before_script:
   - psql -c 'create database airflow;' -U postgres
   - export PATH=${PATH}:/tmp/hive/bin
 script:
-  - pip --version
-  - ls -l $HOME/.wheelhouse
-  - tox --version
-  - tox -e $TOX_ENV
+  - ./scripts/ci/travis_script.sh
 after_success:
   - codecov

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
new file mode 100644
index 0000000..7dc895e
--- /dev/null
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def load_kube_config(in_cluster=True):
+    from kubernetes import config, client
+    if in_cluster:
+        config.load_incluster_config()
+    else:
+        try:
+            config.load_kube_config()
+            return client.CoreV1Api()
+        except NotImplementedError:
+            NotImplementedError(
+                "requires incluster config or defined configuration in airflow.cfg")
+
+
+def get_kube_client(in_cluster=True):
+    # TODO: This should also allow people to point to a cluster.
+
+    from kubernetes import client
+    load_kube_config(in_cluster)
+    return client.CoreV1Api()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
new file mode 100644
index 0000000..9921696
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -0,0 +1,12 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
new file mode 100644
index 0000000..9398bef
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta, abstractmethod
+import six
+
+
+class KubernetesRequestFactory:
+    """
+    Create requests to be sent to kube API.
+    Extend this class to talk to kubernetes and generate your specific resources.
+    This is equivalent of generating yaml files that can be used by `kubectl`
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractmethod
+    def create(self, pod):
+        """
+        Creates the request for kubernetes API.
+
+        :param pod: The pod object
+        """
+        pass
+
+    @staticmethod
+    def extract_image(pod, req):
+        req['spec']['containers'][0]['image'] = pod.image
+
+    @staticmethod
+    def extract_image_pull_policy(pod, req):
+        if pod.image_pull_policy:
+            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
+
+    @staticmethod
+    def add_secret_to_env(env, secret):
+        env.append({
+            'name': secret.deploy_target,
+            'valueFrom': {
+                'secretKeyRef': {
+                    'name': secret.secret,
+                    'key': secret.key
+                }
+            }
+        })
+
+    @staticmethod
+    def extract_labels(pod, req):
+        req['metadata']['labels'] = req['metadata'].get('labels', {})
+        for k, v in six.iteritems(pod.labels):
+            req['metadata']['labels'][k] = v
+
+    @staticmethod
+    def extract_cmds(pod, req):
+        req['spec']['containers'][0]['command'] = pod.cmds
+
+    @staticmethod
+    def extract_args(pod, req):
+        req['spec']['containers'][0]['args'] = pod.args
+
+    @staticmethod
+    def extract_node_selector(pod, req):
+        if len(pod.node_selectors) > 0:
+            req['spec']['nodeSelector'] = pod.node_selectors
+
+    @staticmethod
+    def attach_volumes(pod, req):
+        req['spec']['volumes'] = pod.volumes
+
+    @staticmethod
+    def attach_volume_mounts(pod, req):
+        if len(pod.volume_mounts) > 0:
+            req['spec']['containers'][0]['volumeMounts'] = (
+                req['spec']['containers'][0].get('volumeMounts', []))
+            req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
+
+    @staticmethod
+    def extract_name(pod, req):
+        req['metadata']['name'] = pod.name
+
+    @staticmethod
+    def extract_volume_secrets(pod, req):
+        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+        if any(vol_secrets):
+            req['spec']['containers'][0]['volumeMounts'] = []
+            req['spec']['volumes'] = []
+        for idx, vol in enumerate(vol_secrets):
+            vol_id = 'secretvol' + str(idx)
+            req['spec']['containers'][0]['volumeMounts'].append({
+                'mountPath': vol.deploy_target,
+                'name': vol_id,
+                'readOnly': True
+            })
+            req['spec']['volumes'].append({
+                'name': vol_id,
+                'secret': {
+                    'secretName': vol.secret
+                }
+            })
+
+    @staticmethod
+    def extract_env_and_secrets(pod, req):
+        env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+        if len(pod.envs) > 0 or len(env_secrets) > 0:
+            env = []
+            for k in pod.envs.keys():
+                env.append({'name': k, 'value': pod.envs[k]})
+            for secret in env_secrets:
+                KubernetesRequestFactory.add_secret_to_env(env, secret)
+            req['spec']['containers'][0]['env'] = env
+
+    @staticmethod
+    def extract_resources(pod, req):
+        if not pod.resources or pod.resources.is_empty_resource_request():
+            return
+
+        req['spec']['containers'][0]['resources'] = {}
+
+        if pod.resources.has_requests():
+            req['spec']['containers'][0]['resources']['requests'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'memory'] = pod.resources.request_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'cpu'] = pod.resources.request_cpu
+
+        if pod.resources.has_limits():
+            req['spec']['containers'][0]['resources']['limits'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'memory'] = pod.resources.limit_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'cpu'] = pod.resources.limit_cpu
+
+    @staticmethod
+    def extract_init_containers(pod, req):
+        if pod.init_containers:
+            req['spec']['initContainers'] = pod.init_containers
+
+    @staticmethod
+    def extract_service_account_name(pod, req):
+        if pod.service_account_name:
+            req['spec']['serviceAccountName'] = pod.service_account_name
+
+    @staticmethod
+    def extract_image_pull_secrets(pod, req):
+        if pod.image_pull_secrets:
+            req['spec']['imagePullSecrets'] = [{
+                'name': pull_secret
+            } for pull_secret in pod.image_pull_secrets.split(',')]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
new file mode 100644
index 0000000..3be1a13
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+
+import yaml
+from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
+    import KubernetesRequestFactory
+
+
+class SimplePodRequestFactory(KubernetesRequestFactory):
+    """
+    Request generator for a simple pod.
+    """
+    _yaml = """apiVersion: v1
+kind: Pod
+metadata:
+  name: name
+spec:
+  containers:
+    - name: base
+      image: airflow-slave:latest
+      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
+  restartPolicy: Never
+    """
+
+    def __init__(self):
+        pass
+
+    def create(self, pod):
+        # type: (Pod) -> dict
+        req = yaml.load(self._yaml)
+        self.extract_name(pod, req)
+        self.extract_labels(pod, req)
+        self.extract_image(pod, req)
+        self.extract_image_pull_policy(pod, req)
+        self.extract_cmds(pod, req)
+        self.extract_args(pod, req)
+        self.extract_node_selector(pod, req)
+        self.extract_env_and_secrets(pod, req)
+        self.extract_volume_secrets(pod, req)
+        self.attach_volumes(pod, req)
+        self.attach_volume_mounts(pod, req)
+        self.extract_resources(pod, req)
+        self.extract_service_account_name(pod, req)
+        self.extract_init_containers(pod, req)
+        self.extract_image_pull_secrets(pod, req)
+        return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
new file mode 100644
index 0000000..6a9f76d
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod.py
@@ -0,0 +1,92 @@
+
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Resources:
+    def __init__(
+            self,
+            request_memory=None,
+            request_cpu=None,
+            limit_memory=None,
+            limit_cpu=None):
+        self.request_memory = request_memory
+        self.request_cpu = request_cpu
+        self.limit_memory = limit_memory
+        self.limit_cpu = limit_cpu
+
+    def is_empty_resource_request(self):
+        return not self.has_limits() and not self.has_requests()
+
+    def has_limits(self):
+        return self.limit_cpu is not None or self.limit_memory is not None
+
+    def has_requests(self):
+        return self.request_cpu is not None or self.request_memory is not None
+
+
+class Pod:
+    """
+    Represents a kubernetes pod and manages execution of a single pod.
+    :param image: The docker image
+    :type image: str
+    :param env: A dict containing the environment variables
+    :type env: dict
+    :param cmds: The command to be run on the pod
+    :type cmd: list str
+    :param secrets: Secrets to be launched to the pod
+    :type secrets: list Secret
+    :param result: The result that will be returned to the operator after
+                   successful execution of the pod
+    :type result: any
+    """
+    pod_timeout = 3600
+
+    def __init__(
+            self,
+            image,
+            envs,
+            cmds,
+            args=None,
+            secrets=None,
+            labels=None,
+            node_selectors=None,
+            name=None,
+            volumes=None,
+            volume_mounts=None,
+            namespace='default',
+            result=None,
+            image_pull_policy="IfNotPresent",
+            image_pull_secrets=None,
+            init_containers=None,
+            service_account_name=None,
+            resources=None
+    ):
+        self.image = image
+        self.envs = envs or {}
+        self.cmds = cmds
+        self.args = args or []
+        self.secrets = secrets or []
+        self.result = result
+        self.labels = labels or {}
+        self.name = name
+        self.volumes = volumes or []
+        self.volume_mounts = volume_mounts or []
+        self.node_selectors = node_selectors or []
+        self.namespace = namespace
+        self.image_pull_policy = image_pull_policy
+        self.image_pull_secrets = image_pull_secrets
+        self.init_containers = init_containers
+        self.service_account_name = service_account_name
+        self.resources = resources or Resources()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
new file mode 100644
index 0000000..685be37
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow.contrib.kubernetes.pod import Pod
+import uuid
+
+
+class PodGenerator:
+    """Contains Kubernetes Airflow Worker configuration logic"""
+
+    def __init__(self, kube_config=None):
+        self.kube_config = kube_config
+        self.env_vars = {}
+        self.volumes = []
+        self.volume_mounts = []
+        self.init_containers = []
+        self.secrets = []
+
+    def add_init_container(self,
+                           name,
+                           image,
+                           securityContext,
+                           init_environment,
+                           volume_mounts
+                           ):
+        """
+
+        Adds an init container to the launched pod. useful for pre-
+
+        Args:
+            name (str):
+            image (str):
+            securityContext (dict):
+            init_environment (dict):
+            volume_mounts (dict):
+
+        Returns:
+
+        """
+        self.init_containers.append(
+            {
+                'name': name,
+                'image': image,
+                'securityContext': securityContext,
+                'env': init_environment,
+                'volumeMounts': volume_mounts
+            }
+        )
+
+    def _get_init_containers(self):
+        return self.init_containers
+
+    def add_volume(self, name):
+        """
+
+        Args:
+            name (str):
+
+        Returns:
+
+        """
+        self.volumes.append({'name': name})
+
+    def add_volume_with_configmap(self, name, config_map):
+        self.volumes.append(
+            {
+                'name': name,
+                'configMap': config_map
+            }
+        )
+
+    def add_mount(self,
+                  name,
+                  mount_path,
+                  sub_path,
+                  read_only):
+        """
+
+        Args:
+            name (str):
+            mount_path (str):
+            sub_path (str):
+            read_only:
+
+        Returns:
+
+        """
+
+        self.volume_mounts.append({
+            'name': name,
+            'mountPath': mount_path,
+            'subPath': sub_path,
+            'readOnly': read_only
+        })
+
+    def _get_volumes_and_mounts(self):
+        return self.volumes, self.volume_mounts
+
+    def _get_image_pull_secrets(self):
+        """Extracts any image pull secrets for fetching container(s)"""
+        if not self.kube_config.image_pull_secrets:
+            return []
+        return self.kube_config.image_pull_secrets.split(',')
+
+    def make_pod(self, namespace, image, pod_id, cmds,
+                 arguments, labels, kube_executor_config=None):
+        volumes, volume_mounts = self._get_volumes_and_mounts()
+        worker_init_container_spec = self._get_init_containers()
+
+        # resources = Resources(
+        #     request_memory=kube_executor_config.request_memory,
+        #     request_cpu=kube_executor_config.request_cpu,
+        #     limit_memory=kube_executor_config.limit_memory,
+        #     limit_cpu=kube_executor_config.limit_cpu
+        # )
+
+        return Pod(
+            namespace=namespace,
+            name=pod_id + "-" + str(uuid.uuid1())[:8],
+            image=image,
+            cmds=cmds,
+            args=arguments,
+            labels=labels,
+            envs=self.env_vars,
+            secrets={},
+            # service_account_name=self.kube_config.worker_service_account_name,
+            # image_pull_secrets=self.kube_config.image_pull_secrets,
+            init_containers=worker_init_container_spec,
+            volumes=volumes,
+            volume_mounts=volume_mounts,
+            resources=None
+        )
+
+
+'''
+This class is a necessary building block to the kubernetes executor, which will be PR'd
+shortly
+'''
+
+
+class WorkerGenerator(PodGenerator):
+    def __init__(self, kube_config):
+        PodGenerator.__init__(self, kube_config)
+        self.volumes, self.volume_mounts = self._init_volumes_and_mounts()
+        self.init_containers = self._init_init_containers()
+
+    def _init_volumes_and_mounts(self):
+        dags_volume_name = "airflow-dags"
+        dags_path = os.path.join(self.kube_config.dags_folder,
+                                 self.kube_config.git_subpath)
+        volumes = [{
+            'name': dags_volume_name
+        }]
+        volume_mounts = [{
+            'name': dags_volume_name,
+            'mountPath': dags_path,
+            'readOnly': True
+        }]
+
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = "airflow-config"
+            config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+            volumes.append({
+                'name': config_volume_name,
+                'configMap': {
+                    'name': self.kube_config.airflow_configmap
+                }
+            })
+            volume_mounts.append({
+                'name': config_volume_name,
+                'mountPath': config_path,
+                'subPath': 'airflow.cfg',
+                'readOnly': True
+            })
+
+        # A PV with the DAGs should be mounted
+        if self.kube_config.dags_volume_claim:
+            volumes[0]['persistentVolumeClaim'] = {
+                "claimName": self.kube_config.dags_volume_claim}
+            if self.kube_config.dags_volume_subpath:
+                volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
+        else:
+            # Create a Shared Volume for the Git-Sync module to populate
+            volumes[0]["emptyDir"] = {}
+        return volumes, volume_mounts
+
+    def _init_labels(self, dag_id, task_id, execution_date):
+        return {
+            "airflow-slave": "",
+            "dag_id": dag_id,
+            "task_id": task_id,
+            "execution_date": execution_date
+        },
+
+    def _get_environment(self):
+        env = super(self, WorkerGenerator).env_vars
+        """Defines any necessary environment variables for the pod executor"""
+        env['AIRFLOW__CORE__DAGS_FOLDER'] = '/tmp/dags'
+        env['AIRFLOW__CORE__EXECUTOR'] = 'LocalExecutor'
+
+        if self.kube_config.airflow_configmap:
+            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+        return env
+
+    def _init_init_containers(self, volume_mounts):
+        """When using git to retrieve the DAGs, use the GitSync Init Container"""
+        # If we're using volume claims to mount the dags, no init container is needed
+        if self.kube_config.dags_volume_claim:
+            return []
+
+        # Otherwise, define a git-sync init container
+        init_environment = [{
+            'name': 'GIT_SYNC_REPO',
+            'value': self.kube_config.git_repo
+        }, {
+            'name': 'GIT_SYNC_BRANCH',
+            'value': self.kube_config.git_branch
+        }, {
+            'name': 'GIT_SYNC_ROOT',
+            'value': '/tmp'
+        }, {
+            'name': 'GIT_SYNC_DEST',
+            'value': 'dags'
+        }, {
+            'name': 'GIT_SYNC_ONE_TIME',
+            'value': 'true'
+        }]
+        if self.kube_config.git_user:
+            init_environment.append({
+                'name': 'GIT_SYNC_USERNAME',
+                'value': self.kube_config.git_user
+            })
+        if self.kube_config.git_password:
+            init_environment.append({
+                'name': 'GIT_SYNC_PASSWORD',
+                'value': self.kube_config.git_password
+            })
+
+        volume_mounts[0]['readOnly'] = False
+        return [{
+            'name': self.kube_config.git_sync_init_container_name,
+            'image': self.kube_config.git_sync_container,
+            'securityContext': {'runAsUser': 0},
+            'env': init_environment,
+            'volumeMounts': volume_mounts
+        }]
+
+    def make_worker_pod(self,
+                        namespace,
+                        pod_id,
+                        dag_id,
+                        task_id,
+                        execution_date,
+                        airflow_command,
+                        kube_executor_config):
+        cmds = ["bash", "-cx", "--"]
+        labels = self._init_labels(dag_id, task_id, execution_date)
+        PodGenerator.make_pod(self,
+                              namespace=namespace,
+                              pod_id=pod_id,
+                              cmds=cmds,
+                              arguments=airflow_command,
+                              labels=labels,
+                              kube_executor_config=kube_executor_config)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
new file mode 100644
index 0000000..c910929
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -0,0 +1,119 @@
+
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import time
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
+from datetime import datetime as dt
+from airflow.contrib.kubernetes.kubernetes_request_factory import \
+    pod_request_factory as pod_fac
+from kubernetes import watch
+from kubernetes.client.rest import ApiException
+from airflow import AirflowException
+
+from .kube_client import get_kube_client
+
+
+class PodStatus(object):
+    PENDING = 'pending'
+    RUNNING = 'running'
+    FAILED = 'failed'
+    SUCCEEDED = 'succeeded'
+
+
+class PodLauncher(LoggingMixin):
+    def __init__(self, kube_client=None):
+        super(PodLauncher, self).__init__()
+        self._client = kube_client or get_kube_client()
+        self._watch = watch.Watch()
+        self.kube_req_factory = pod_fac.SimplePodRequestFactory()
+
+    def run_pod_async(self, pod):
+        req = self.kube_req_factory.create(pod)
+        self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, indent=2)))
+        try:
+            resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace)
+            self.log.debug('Pod Creation Response: {}'.format(resp))
+        except ApiException:
+            self.log.exception('Exception when attempting to create Namespaced Pod.')
+            raise
+        return resp
+
+    def run_pod(self, pod, startup_timeout=120):
+        # type: (Pod) -> State
+        """
+        Launches the pod synchronously and waits for completion.
+
+        Args:
+            pod (Pod):
+            startup_timeout (int): Timeout for startup of the pod (if pod is pending for
+             too long, considers task a failure
+        """
+        resp = self.run_pod_async(pod)
+        curr_time = dt.now()
+        if resp.status.start_time is None:
+            while self.pod_not_started(pod):
+                delta = dt.now() - curr_time
+                if delta.seconds >= startup_timeout:
+                    raise AirflowException("Pod took too long to start")
+                time.sleep(1)
+            self.log.debug('Pod not yet started')
+
+        final_status = self._monitor_pod(pod)
+        return final_status
+
+    def _monitor_pod(self, pod):
+        # type: (Pod) -> State
+
+        while self.pod_is_running(pod):
+            self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
+            time.sleep(2)
+        return self._task_status(self.read_pod(pod))
+
+    def _task_status(self, event):
+        # type: (V1Pod) -> State
+        self.log.info(
+            "Event: {} had an event of type {}".format(event.metadata.name,
+                                                       event.status.phase))
+        status = self.process_status(event.metadata.name, event.status.phase)
+        return status
+
+    def pod_not_started(self, pod):
+        state = self._task_status(self.read_pod(pod))
+        return state == State.QUEUED
+
+    def pod_is_running(self, pod):
+        state = self._task_status(self.read_pod(pod))
+        return state != State.SUCCESS and state != State.FAILED
+
+    def read_pod(self, pod):
+        return self._client.read_namespaced_pod(pod.name, pod.namespace)
+
+    def process_status(self, job_id, status):
+        status = status.lower()
+        if status == PodStatus.PENDING:
+            return State.QUEUED
+        elif status == PodStatus.FAILED:
+            self.log.info("Event: {} Failed".format(job_id))
+            return State.FAILED
+        elif status == PodStatus.SUCCEEDED:
+            self.log.info("Event: {} Succeeded".format(job_id))
+            return State.SUCCESS
+        elif status == PodStatus.RUNNING:
+            return State.RUNNING
+        else:
+            self.log.info("Event: Invalid state {} on job {}".format(status, job_id))
+            return State.FAILED

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
new file mode 100644
index 0000000..15f070e
--- /dev/null
+++ b/airflow/contrib/kubernetes/secret.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Secret:
+    """Defines Kubernetes Secret Volume"""
+
+    def __init__(self, deploy_type, deploy_target, secret, key):
+        """Initialize a Kubernetes Secret Object. Used to track requested secrets from
+        the user.
+
+        :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
+            `volume`
+        :type deploy_type: ``str``
+        :param deploy_target: The environment variable to be created in the worker.
+        :type deploy_target: ``str``
+        :param secret: Name of the secrets object in Kubernetes
+        :type secret: ``str``
+        :param key: Key of the secret within the Kubernetes Secret
+        :type key: ``str``
+        """
+        self.deploy_type = deploy_type
+        self.deploy_target = deploy_target.upper()
+        self.secret = secret
+        self.key = key

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
new file mode 100644
index 0000000..f09a25c
--- /dev/null
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
+from airflow.utils.state import State
+
+template_fields = ('templates_dict',)
+template_ext = tuple()
+ui_color = '#ffefeb'
+
+
+class KubernetesPodOperator(BaseOperator):
+    def execute(self, context):
+        try:
+
+            client = kube_client.get_kube_client(in_cluster=self.in_cluster)
+            gen = pod_generator.PodGenerator()
+
+            pod = gen.make_pod(namespace=self.namespace,
+                               image=self.image,
+                               pod_id=self.name,
+                               cmds=self.cmds,
+                               arguments=self.arguments,
+                               labels=self.labels,
+                               kube_executor_config=self.kube_executor_config
+                               )
+
+            launcher = pod_launcher.PodLauncher(client)
+            final_state = launcher.run_pod(pod, self.startup_timeout_seconds)
+            if final_state != State.SUCCESS:
+                raise AirflowException("Pod returned a failure")
+        except AirflowException as ex:
+            raise AirflowException("Pod Launching failed: {error}".format(error=ex))
+
+    @apply_defaults
+    def __init__(self,
+                 namespace,
+                 image,
+                 cmds,
+                 arguments,
+                 name,
+                 in_cluster=False,
+                 labels=None,
+                 startup_timeout_seconds=120,
+                 kube_executor_config=None,
+                 *args,
+                 **kwargs):
+        super(KubernetesPodOperator, self).__init__(*args, **kwargs)
+        self.kube_executor_config = kube_executor_config or {}
+        self.image = image
+        self.namespace = namespace
+        self.cmds = cmds
+        self.arguments = arguments
+        self.labels = labels or {}
+        self.startup_timeout_seconds = startup_timeout_seconds
+        self.name = name
+        self.in_cluster = in_cluster

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
new file mode 100755
index 0000000..f78cb3a
--- /dev/null
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -0,0 +1,53 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+# Guard against a kubernetes cluster already being up
+#!/usr/bin/env bash
+kubectl get pods &> /dev/null
+if [ $? -eq 0 ]; then
+  echo "kubectl get pods returned 0 exit code, exiting early"
+  exit 0
+fi
+#
+
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+
+sudo mkdir -p /usr/local/bin
+sudo mv minikube /usr/local/bin/minikube
+sudo mv kubectl /usr/local/bin/kubectl
+
+export MINIKUBE_WANTUPDATENOTIFICATION=false
+export MINIKUBE_WANTREPORTERRORPROMPT=false
+export MINIKUBE_HOME=$HOME
+export CHANGE_MINIKUBE_NONE_USER=true
+mkdir $HOME/.kube || true
+touch $HOME/.kube/config
+
+export KUBECONFIG=$HOME/.kube/config
+sudo -E minikube start --vm-driver=none
+
+# this for loop waits until kubectl can access the api server that minikube has created
+for i in {1..150} # timeout for 5 minutes
+do
+  echo "------- Running kubectl get pods -------"
+  kubectl get po &> /dev/null
+  if [ $? -ne 1 ]; then
+    break
+  fi
+  sleep 2
+done

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/setup_kubernetes.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/setup_kubernetes.sh b/scripts/ci/kubernetes/setup_kubernetes.sh
new file mode 100755
index 0000000..fa4e523
--- /dev/null
+++ b/scripts/ci/kubernetes/setup_kubernetes.sh
@@ -0,0 +1,28 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+set -o xtrace
+set -e
+
+echo "This script downloads minikube, starts a driver=None minikube cluster, builds the airflow source and docker image, and then deploys airflow onto kubernetes"
+echo "For development, start minikube yourself (ie: minikube start) then run this script as you probably do not want a driver=None minikube cluster"
+
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+
+$DIRNAME/minikube/start_minikube.sh
+
+echo "Airflow environment on kubernetes is good to go!"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 2b5a8c9..b6ed49c 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -93,3 +93,4 @@ thrift
 thrift_sasl
 unicodecsv
 zdesk
+kubernetes

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
new file mode 100755
index 0000000..a51e742
--- /dev/null
+++ b/scripts/ci/travis_script.sh
@@ -0,0 +1,39 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+AIRFLOW_ROOT="$DIRNAME/../.."
+cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
+
+if [ -z "$RUN_KUBE_INTEGRATION" ];
+then
+  $DIRNAME/kubernetes/setup_kubernetes.sh
+  tox -e $TOX_ENV
+else
+  $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.executors.integration \
+                     --with-coverage \
+                     --cover-erase \
+                     --cover-html \
+                     --cover-package=airflow \
+                     --cover-html-dir=airflow/www/static/coverage \
+                     --with-ignore-docstrings \
+                     --rednose \
+                     --with-timer \
+                     -v \
+                     --logging-level=DEBUG
+fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 77e79bf..ba5b031 100644
--- a/setup.py
+++ b/setup.py
@@ -162,6 +162,8 @@ github_enterprise = ['Flask-OAuthlib>=0.9.1']
 qds = ['qds-sdk>=1.9.6']
 cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
 redis = ['redis>=2.10.5']
+kubernetes = ['kubernetes>=3.0.0',
+              'cryptography>=2.0.0']
 
 all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
 devel = [
@@ -182,7 +184,8 @@ devel = [
 ]
 devel_minreq = devel + mysql + doc + password + s3 + cgroups
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
-devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh
+devel_all = (devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh +
+             kubernetes)
 
 
 def do_setup():
@@ -278,6 +281,7 @@ def do_setup():
             'webhdfs': webhdfs,
             'jira': jira,
             'redis': redis,
+            'kubernetes': kubernetes
         },
         classifiers=[
             'Development Status :: 5 - Production/Stable',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..205f183
--- /dev/null
+++ b/tests/contrib/operators/test_kubernetes_pod_operator.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_working_pod(self):
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        k.execute(None)
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image=bad_image_name,
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task",
+                                  startup_timeout_seconds=5
+                                  )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=[bad_internal_command, "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        with self.assertRaises(AirflowException):
+            k.execute(None)


[15/16] incubator-airflow git commit: [AIRFLOW-1517] Kubernetes operator PR fixes

Posted by bo...@apache.org.
[AIRFLOW-1517] Kubernetes operator PR fixes

Fix python flake8 linting issues and AIRFLOW license issues


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7fb5906e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7fb5906e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7fb5906e

Branch: refs/heads/master
Commit: 7fb5906e68fdf351e97acbf04f334b2a86081e81
Parents: d5b13a3
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Jan 11 16:24:23 2018 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:34 2018 -0800

----------------------------------------------------------------------
 scripts/ci/kubernetes/minikube/start_minikube.sh             | 3 +--
 tests/contrib/minikube_tests/test_kubernetes_pod_operator.py | 5 +++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fb5906e/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 349b210..be370cf 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -48,7 +47,7 @@ start_minikube(){
   do
     echo "------- Running kubectl get pods -------"
     STDERR=$(kubectl get pods  2>&1 >/dev/null)
-    if [ $? -ne 1 ]; then
+    if [ $? -eq 0 ]; then
       echo $STDERR
 
       # We do not need dynamic hostpath provisioning, so disable the default storageclass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7fb5906e/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
index a9a8e97..4bbde8f 100644
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -19,16 +19,17 @@ import unittest
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
 from airflow import AirflowException
 from subprocess import check_call
-import logging
 
 
 try:
     check_call(["kubectl", "get", "pods"])
 except Exception as e:
     raise unittest.SkipTest(
-        "Kubernetes integration tests require a minikube cluster; Skipping tests {}".format(e)
+        "Kubernetes integration tests require a minikube cluster;"
+        "Skipping tests {}".format(e)
     )
 
+
 class KubernetesPodOperatorTest(unittest.TestCase):
 
     def test_working_pod(self):


[14/16] incubator-airflow git commit: [AIRFLOW-1517] addressed PR comments

Posted by bo...@apache.org.
[AIRFLOW-1517] addressed PR comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d5b13a3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d5b13a3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d5b13a3d

Branch: refs/heads/master
Commit: d5b13a3dadb4632006fabfd96ea062dd70f612de
Parents: 12b725d
Author: Daniel Imberman <da...@gmail.com>
Authored: Tue Jan 2 09:38:30 2018 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:27 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/kubernetes/kube_client.py       |  1 +
 airflow/contrib/kubernetes/pod.py               |  2 --
 airflow/contrib/kubernetes/pod_launcher.py      |  8 +++--
 docs/kubernetes.rst                             | 28 ++++++---------
 .../ci/kubernetes/minikube/start_minikube.sh    | 36 ++++++++++----------
 .../test_kubernetes_pod_operator.py             |  6 ++--
 6 files changed, 39 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index ecb3d55..d1a63a2 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -25,6 +25,7 @@ def _load_kube_config(in_cluster):
         config.load_kube_config()
         return client.CoreV1Api()
 
+
 def get_kube_client(in_cluster=True):
     # TODO: This should also allow people to point to a cluster.
     return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index d80b626..b4eb5a1 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -53,8 +53,6 @@ class Pod:
                    successful execution of the pod
     :type result: any
     """
-    pod_timeout = 3600
-
     def __init__(
             self,
             image,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index d2d3af7..51f443b 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -25,7 +25,7 @@ from airflow.contrib.kubernetes.kubernetes_request_factory import \
 from kubernetes import watch
 from kubernetes.client.rest import ApiException
 from airflow import AirflowException
-
+from requests.exceptions import HTTPError
 from .kube_client import get_kube_client
 
 
@@ -102,7 +102,11 @@ class PodLauncher(LoggingMixin):
         return state != State.SUCCESS and state != State.FAILED
 
     def read_pod(self, pod):
-        return self._client.read_namespaced_pod(pod.name, pod.namespace)
+        try:
+            return self._client.read_namespaced_pod(pod.name, pod.namespace)
+        except HTTPError as e:
+            raise AirflowException("There was an error reading the kubernetes API: {}"
+                                   .format(e))
 
     def process_status(self, job_id, status):
         status = status.lower()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 8d57028..21d8501 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -17,20 +17,14 @@ Kubernetes Operator
 
 
 
-+--------------+----------------------------------------------------------------+---------------+
-| name       | description |
-+==============+================================================================+===============+
-| ``@namespace`` | The namespace is your isolated work environment within kubernetes|
-+--------------+----------------------------------------------------------------+---------------+
-| ``@image``    | docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories |
-+--------------+----------------------------------------------------------------+---------------+
-| ``@cmnds``  | To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container           |
-+--------------+----------------------------------------------------------------+---------------+
-| ``arguments``   | arguments for your bash command |
-+--------------+----------------------------------------------------------------+---------------+
-| ``@labels``  | Labels are an important element of launching kubernetes pods, as it tells
-|               | kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} |
-|               | and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service. |
-+--------------+----------------------------------------------------------------+---------------+
-| ``@name`` | name of the task you want to run, will be used to generate a pod id |
-+--------------+----------------------------------------------------------------+---------------+
+=================================   ====================================
+Variable                            Description
+=================================   ====================================
+``@namespace``                      The namespace is your isolated work environment within kubernetes
+``@image``                          docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories
+ ``@cmds``                           To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container
+``arguments``                       arguments for your bash command
+``@labels``                         Labels are an important element of launching kubernetes pods, as it tells kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label  {'postgres':'foo'} and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service.
+``@name``                           name of the task you want to run, will be used to generate a pod id
+=================================   ====================================
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 1da23d0..349b210 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -1,21 +1,21 @@
-#  Licensed to the Apache Software Foundation (ASF) under one   *
-#  or more contributor license agreements.  See the NOTICE file *
-#  distributed with this work for additional information        *
-#  regarding copyright ownership.  The ASF licenses this file   *
-#  to you under the Apache License, Version 2.0 (the            *
-#  "License"); you may not use this file except in compliance   *
-#  with the License.  You may obtain a copy of the License at   *
-#                                                               *
-#    http://www.apache.org/licenses/LICENSE-2.0                 *
-#                                                               *
-#  Unless required by applicable law or agreed to in writing,   *
-#  software distributed under the License is distributed on an  *
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
-#  KIND, either express or implied.  See the License for the    *
-#  specific language governing permissions and limitations      *
-#  under the License.                                           *
-
 #!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 # Guard against a kubernetes cluster already being up
 kubectl get pods &> /dev/null
 if [ $? -eq 0 ]; then
@@ -54,7 +54,7 @@ start_minikube(){
       # We do not need dynamic hostpath provisioning, so disable the default storageclass
       sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
 
-      # We need to give permission to watch pods to the airflow scheduler. 
+      # We need to give permission to watch pods to the airflow scheduler.
       # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
       kubectl create clusterrolebinding add-on-cluster-admin   --clusterrole=cluster-admin   --serviceaccount=default:default
       exit 0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
index 5cdd819..a9a8e97 100644
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -19,16 +19,16 @@ import unittest
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
 from airflow import AirflowException
 from subprocess import check_call
+import logging
 
 
 try:
     check_call(["kubectl", "get", "pods"])
-except:
+except Exception as e:
     raise unittest.SkipTest(
-        "Kubernetes integration tests require a minikube cluster; Skipping tests"
+        "Kubernetes integration tests require a minikube cluster; Skipping tests {}".format(e)
     )
 
-
 class KubernetesPodOperatorTest(unittest.TestCase):
 
     def test_working_pod(self):


[07/16] incubator-airflow git commit: [AIRFLOW-1517] fixed license issues

Posted by bo...@apache.org.
[AIRFLOW-1517] fixed license issues


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/02a93848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/02a93848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/02a93848

Branch: refs/heads/master
Commit: 02a93848fe285a5ba9ad81e8a122b36ed83398b8
Parents: eeff445
Author: Daniel Imberman <da...@gmail.com>
Authored: Fri Dec 29 14:08:35 2017 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:15 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/kubernetes/__init__.py          | 25 ++++++++++---------
 airflow/contrib/kubernetes/kube_client.py       | 25 ++++++++++---------
 .../kubernetes_request_factory/__init__.py      | 24 ++++++++++--------
 .../kubernetes_request_factory.py               | 25 ++++++++++---------
 .../pod_request_factory.py                      | 24 ++++++++++--------
 airflow/contrib/kubernetes/pod.py               | 26 +++++++++++---------
 airflow/contrib/kubernetes/pod_generator.py     | 25 ++++++++++---------
 airflow/contrib/kubernetes/pod_launcher.py      | 26 +++++++++++---------
 airflow/contrib/kubernetes/secret.py            | 25 ++++++++++---------
 .../operators/kubernetes_pod_operator.py        | 25 ++++++++++---------
 10 files changed, 140 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py
index 9d7677a..13a8339 100644
--- a/airflow/contrib/kubernetes/__init__.py
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -1,13 +1,16 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 89de3e6..ecb3d55 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 
 def _load_kube_config(in_cluster):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
index 9921696..13a8339 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -1,12 +1,16 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 7d698b4..0324781 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 from abc import ABCMeta, abstractmethod
 import six

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 3be1a13..44b05dd 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -1,15 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import yaml
 from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index cdb1d65..ba6ac06 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -1,17 +1,19 @@
-
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 685be37..cf85092 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import os
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py
index c910929..d2d3af7 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -1,17 +1,19 @@
-
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import json
 import time

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index 15f070e..ec5d51c 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 
 class Secret:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02a93848/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f09a25c..5d03875 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator


[11/16] incubator-airflow git commit: [AIRFLOW-1517] Restore authorship of resources

Posted by bo...@apache.org.
[AIRFLOW-1517] Restore authorship of resources

Collaboration authors got destroyed when splitting up a PR, this commit adds back in the code which was be removed in the previous commit to restore authorship


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c9e3c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c9e3c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c9e3c1f

Branch: refs/heads/master
Commit: 7c9e3c1f849f5b12c7cb990b3f218aadab2f9586
Parents: 02a9384
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:39:48 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               | 33 ++++++++++++++++++--
 airflow/contrib/kubernetes/pod.py               | 19 +++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c9e3c1f/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 0324781..cbf3fce 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -40,6 +40,10 @@ class KubernetesRequestFactory:
     def extract_image(pod, req):
         req['spec']['containers'][0]['image'] = pod.image
 
+    @staticmethod
+    def extract_image_pull_policy(pod, req):
+        if pod.image_pull_policy:
+            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
 
     @staticmethod
     def add_secret_to_env(env, secret):
@@ -72,7 +76,9 @@ class KubernetesRequestFactory:
         if len(pod.node_selectors) > 0:
             req['spec']['nodeSelector'] = pod.node_selectors
 
-
+    @staticmethod
+    def attach_volumes(pod, req):
+        req['spec']['volumes'] = pod.volumes
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -116,7 +122,30 @@ class KubernetesRequestFactory:
                 KubernetesRequestFactory.add_secret_to_env(env, secret)
             req['spec']['containers'][0]['env'] = env
 
-
+    @staticmethod
+    def extract_resources(pod, req):
+        if not pod.resources or pod.resources.is_empty_resource_request():
+            return
+
+        req['spec']['containers'][0]['resources'] = {}
+
+        if pod.resources.has_requests():
+            req['spec']['containers'][0]['resources']['requests'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'memory'] = pod.resources.request_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'cpu'] = pod.resources.request_cpu
+
+        if pod.resources.has_limits():
+            req['spec']['containers'][0]['resources']['limits'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'memory'] = pod.resources.limit_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'cpu'] = pod.resources.limit_cpu
 
     @staticmethod
     def extract_init_containers(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c9e3c1f/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index ba6ac06..d80b626 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -16,7 +16,26 @@
 # under the License.
 
 
+class Resources:
+    def __init__(
+            self,
+            request_memory=None,
+            request_cpu=None,
+            limit_memory=None,
+            limit_cpu=None):
+        self.request_memory = request_memory
+        self.request_cpu = request_cpu
+        self.limit_memory = limit_memory
+        self.limit_cpu = limit_cpu
+
+    def is_empty_resource_request(self):
+        return not self.has_limits() and not self.has_requests()
+
+    def has_limits(self):
+        return self.limit_cpu is not None or self.limit_memory is not None
 
+    def has_requests(self):
+        return self.request_cpu is not None or self.request_memory is not None
 
 
 class Pod:


[05/16] incubator-airflow git commit: [AIRFLOW-1517] Created more accurate failures for kube cluster issues

Posted by bo...@apache.org.
[AIRFLOW-1517] Created more accurate failures for kube cluster issues


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eeff4459
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eeff4459
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eeff4459

Branch: refs/heads/master
Commit: eeff445969743626516825aea5ea559195eeaae3
Parents: ada7aed
Author: Daniel Imberman <da...@gmail.com>
Authored: Fri Dec 29 14:06:42 2017 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:28:33 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/kubernetes/kube_client.py | 9 ++-------
 1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eeff4459/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 9db8641..89de3e6 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -19,13 +19,8 @@ def _load_kube_config(in_cluster):
         config.load_incluster_config()
         return client.CoreV1Api()
     else:
-        try:
-            config.load_kube_config()
-            return client.CoreV1Api()
-        except NotImplementedError:
-            NotImplementedError(
-                "requires incluster config or defined configuration in airflow.cfg")
-
+        config.load_kube_config()
+        return client.CoreV1Api()
 
 def get_kube_client(in_cluster=True):
     # TODO: This should also allow people to point to a cluster.


[16/16] incubator-airflow git commit: Merge pull request #2853 from dimberman/Airflow_1517_kubenetes_operator

Posted by bo...@apache.org.
Merge pull request #2853 from dimberman/Airflow_1517_kubenetes_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1abe7f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1abe7f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1abe7f6d

Branch: refs/heads/master
Commit: 1abe7f6d5413b81569be97e7871a688e114f3c47
Parents: b48bbbd 7fb5906
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Jan 12 19:02:52 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 12 19:02:52 2018 +0100

----------------------------------------------------------------------
 .travis.yml                                     |  14 +-
 airflow/contrib/kubernetes/__init__.py          |  16 ++
 airflow/contrib/kubernetes/kube_client.py       |  31 ++
 .../kubernetes_request_factory/__init__.py      |  16 ++
 .../kubernetes_request_factory.py               | 165 +++++++++++
 .../pod_request_factory.py                      |  60 ++++
 airflow/contrib/kubernetes/pod.py               |  92 ++++++
 airflow/contrib/kubernetes/pod_generator.py     | 281 +++++++++++++++++++
 airflow/contrib/kubernetes/pod_launcher.py      | 125 +++++++++
 airflow/contrib/kubernetes/secret.py            |  39 +++
 .../operators/kubernetes_pod_operator.py        |  74 +++++
 docs/kubernetes.rst                             |  30 ++
 .../ci/kubernetes/minikube/start_minikube.sh    |  80 ++++++
 scripts/ci/kubernetes/setup_kubernetes.sh       |  28 ++
 scripts/ci/requirements.txt                     |   1 +
 scripts/ci/run_tests.sh                         |   2 +-
 scripts/ci/travis_script.sh                     |  38 +++
 setup.py                                        |   6 +-
 tests/contrib/minikube_tests/__init__.py        |  13 +
 .../test_kubernetes_pod_operator.py             |  79 ++++++
 20 files changed, 1184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1abe7f6d/scripts/ci/requirements.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1abe7f6d/setup.py
----------------------------------------------------------------------


[12/16] incubator-airflow git commit: [AIRFLOW-1517] started documentation of k8s operator

Posted by bo...@apache.org.
[AIRFLOW-1517] started documentation of k8s operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/12b725df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/12b725df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/12b725df

Branch: refs/heads/master
Commit: 12b725df154e28511edaa85074614c8974175b0d
Parents: 28d9d7f
Author: Daniel Imberman <da...@gmail.com>
Authored: Tue Jan 2 09:22:57 2018 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:17 2018 -0800

----------------------------------------------------------------------
 docs/kubernetes.rst | 36 ++++++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/12b725df/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
new file mode 100644
index 0000000..8d57028
--- /dev/null
+++ b/docs/kubernetes.rst
@@ -0,0 +1,36 @@
+Kubernetes Operator
+=========
+
+
+
+.. code:: python
+
+    from airflow.comtrib.operators import KubernetesOperator
+    k = KubernetesPodOperator(namespace='default',
+                              image="ubuntu:16.04",
+                              cmds=["bash", "-cx"],
+                              arguments=["echo", "10"],
+                              labels={"foo": "bar"},
+                              name="test",
+                              task_id="task"
+                              )
+
+
+
++--------------+----------------------------------------------------------------+---------------+
+| name       | description |
++==============+================================================================+===============+
+| ``@namespace`` | The namespace is your isolated work environment within kubernetes|
++--------------+----------------------------------------------------------------+---------------+
+| ``@image``    | docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories |
++--------------+----------------------------------------------------------------+---------------+
+| ``@cmnds``  | To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container           |
++--------------+----------------------------------------------------------------+---------------+
+| ``arguments``   | arguments for your bash command |
++--------------+----------------------------------------------------------------+---------------+
+| ``@labels``  | Labels are an important element of launching kubernetes pods, as it tells
+|               | kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} |
+|               | and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service. |
++--------------+----------------------------------------------------------------+---------------+
+| ``@name`` | name of the task you want to run, will be used to generate a pod id |
++--------------+----------------------------------------------------------------+---------------+


[08/16] incubator-airflow git commit: [AIRFLOW-1517] Add minikube for kubernetes integration tests

Posted by bo...@apache.org.
[AIRFLOW-1517] Add minikube for kubernetes integration tests

Add better support for minikube integration tests; By default minikube integration tests will run with kubernetes 1.7 and kubernetes 1.8


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/965439be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/965439be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/965439be

Branch: refs/heads/master
Commit: 965439bef04de4744796c8516ad8ff7e548639e5
Parents: a42dbb4
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Wed Dec 27 14:21:20 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  9 +++
 airflow/contrib/kubernetes/kube_client.py       |  8 +-
 .../ci/kubernetes/minikube/start_minikube.sh    | 58 +++++++++++----
 scripts/ci/run_tests.sh                         |  2 +-
 scripts/ci/travis_script.sh                     |  7 +-
 tests/contrib/minikube_tests/__init__.py        | 13 ++++
 .../test_kubernetes_pod_operator.py             | 78 ++++++++++++++++++++
 .../operators/test_kubernetes_pod_operator.py   | 69 -----------------
 8 files changed, 150 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6b45153..dec9181 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,6 +54,8 @@ env:
     - TOX_ENV=py35-backend_sqlite
     - TOX_ENV=py35-backend_postgres
     - TOX_ENV=flake8
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
 matrix:
   exclude:
     - python: "3.5"
@@ -70,6 +72,13 @@ matrix:
       env: TOX_ENV=py35-backend_postgres
     - python: "2.7"
       env: TOX_ENV=flake8
+    - python: "3.5"  
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - python: "3.5"
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 
+  allow_failures:
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0  
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index cd68caf..ecb3d55 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -16,17 +16,15 @@
 # under the License.
 
 
-def load_kube_config(in_cluster=True):
+def _load_kube_config(in_cluster):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
+        return client.CoreV1Api()
     else:
         config.load_kube_config()
         return client.CoreV1Api()
 
 def get_kube_client(in_cluster=True):
     # TODO: This should also allow people to point to a cluster.
-
-    from kubernetes import client
-    load_kube_config(in_cluster)
-    return client.CoreV1Api()
+    return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index f78cb3a..1da23d0 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
 #  specific language governing permissions and limitations      *
 #  under the License.                                           *
 
-# Guard against a kubernetes cluster already being up
 #!/usr/bin/env bash
+# Guard against a kubernetes cluster already being up
 kubectl get pods &> /dev/null
 if [ $? -eq 0 ]; then
   echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin
 sudo mv minikube /usr/local/bin/minikube
@@ -39,15 +39,43 @@ mkdir $HOME/.kube || true
 touch $HOME/.kube/config
 
 export KUBECONFIG=$HOME/.kube/config
-sudo -E minikube start --vm-driver=none
-
-# this for loop waits until kubectl can access the api server that minikube has created
-for i in {1..150} # timeout for 5 minutes
-do
-  echo "------- Running kubectl get pods -------"
-  kubectl get po &> /dev/null
-  if [ $? -ne 1 ]; then
-    break
-  fi
-  sleep 2
-done
+
+start_minikube(){
+  sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
+
+  # this for loop waits until kubectl can access the api server that minikube has created
+  for i in {1..90} # timeout 3 minutes
+  do
+    echo "------- Running kubectl get pods -------"
+    STDERR=$(kubectl get pods  2>&1 >/dev/null)
+    if [ $? -ne 1 ]; then
+      echo $STDERR
+
+      # We do not need dynamic hostpath provisioning, so disable the default storageclass
+      sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
+
+      # We need to give permission to watch pods to the airflow scheduler. 
+      # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
+      kubectl create clusterrolebinding add-on-cluster-admin   --clusterrole=cluster-admin   --serviceaccount=default:default
+      exit 0
+    fi
+    echo $STDERR
+    sleep 2
+  done
+}
+
+cleanup_minikube(){
+  sudo -E minikube stop
+  sudo -E minikube delete
+  docker stop $(docker ps -a -q) || true
+  docker rm $(docker ps -a -q) || true
+  sleep 1
+}
+
+start_minikube
+echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
+cleanup_minikube
+start_minikube
+echo "Minikube cluster creation timedout a second time. Failing."
+
+exit 1

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 1253686..8c47ee8 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
 
 if [[ "$SKIP_TESTS" != "true" ]]; then
     echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
-    ./run_unit_tests.sh
+    ./run_unit_tests.sh $@
 fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index a51e742..86c086a 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,13 +19,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
 AIRFLOW_ROOT="$DIRNAME/../.."
 cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
 
-if [ -z "$RUN_KUBE_INTEGRATION" ];
+if [ -z "$KUBERNETES_VERSION" ];
 then
-  $DIRNAME/kubernetes/setup_kubernetes.sh
   tox -e $TOX_ENV
 else
-  $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.executors.integration \
+  KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/minikube_tests/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..5cdd819
--- /dev/null
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster; Skipping tests"
+    )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_working_pod(self):
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        k.execute(None)
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image=bad_image_name,
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task",
+                                  startup_timeout_seconds=5
+                                  )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=[bad_internal_command, "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        with self.assertRaises(AirflowException):
+            k.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
deleted file mode 100644
index 205f183..0000000
--- a/tests/contrib/operators/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
-    def test_working_pod(self):
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        k.execute(None)
-
-    def test_faulty_image(self):
-        bad_image_name = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image=bad_image_name,
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task",
-                                  startup_timeout_seconds=5
-                                  )
-        with self.assertRaises(AirflowException) as cm:
-            k.execute(None),
-
-        print("exception: {}".format(cm))
-
-    def test_pod_failure(self):
-        """
-            Tests that the task fails when a pod reports a failure
-        """
-
-        bad_internal_command = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=[bad_internal_command, "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        with self.assertRaises(AirflowException):
-            k.execute(None)


[09/16] incubator-airflow git commit: [AIRFLOW-1517] Remove authorship of resources

Posted by bo...@apache.org.
[AIRFLOW-1517] Remove authorship of resources

Collaboration authors got destroyed when splitting up a PR, this commit removes code which will be readded in the next commit to restore authorship


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/540b724a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/540b724a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/540b724a

Branch: refs/heads/master
Commit: 540b724a0d8c81ae589a33b4de302014da4f0228
Parents: 965439b
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:38:28 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               | 33 ++------------------
 airflow/contrib/kubernetes/pod.py               | 19 -----------
 2 files changed, 2 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/540b724a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index cbf3fce..0324781 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -40,10 +40,6 @@ class KubernetesRequestFactory:
     def extract_image(pod, req):
         req['spec']['containers'][0]['image'] = pod.image
 
-    @staticmethod
-    def extract_image_pull_policy(pod, req):
-        if pod.image_pull_policy:
-            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
 
     @staticmethod
     def add_secret_to_env(env, secret):
@@ -76,9 +72,7 @@ class KubernetesRequestFactory:
         if len(pod.node_selectors) > 0:
             req['spec']['nodeSelector'] = pod.node_selectors
 
-    @staticmethod
-    def attach_volumes(pod, req):
-        req['spec']['volumes'] = pod.volumes
+
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -122,30 +116,7 @@ class KubernetesRequestFactory:
                 KubernetesRequestFactory.add_secret_to_env(env, secret)
             req['spec']['containers'][0]['env'] = env
 
-    @staticmethod
-    def extract_resources(pod, req):
-        if not pod.resources or pod.resources.is_empty_resource_request():
-            return
-
-        req['spec']['containers'][0]['resources'] = {}
-
-        if pod.resources.has_requests():
-            req['spec']['containers'][0]['resources']['requests'] = {}
-            if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'memory'] = pod.resources.request_memory
-            if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'cpu'] = pod.resources.request_cpu
-
-        if pod.resources.has_limits():
-            req['spec']['containers'][0]['resources']['limits'] = {}
-            if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'memory'] = pod.resources.limit_memory
-            if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'cpu'] = pod.resources.limit_cpu
+
 
     @staticmethod
     def extract_init_containers(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/540b724a/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index d80b626..ba6ac06 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -16,26 +16,7 @@
 # under the License.
 
 
-class Resources:
-    def __init__(
-            self,
-            request_memory=None,
-            request_cpu=None,
-            limit_memory=None,
-            limit_cpu=None):
-        self.request_memory = request_memory
-        self.request_cpu = request_cpu
-        self.limit_memory = limit_memory
-        self.limit_cpu = limit_cpu
-
-    def is_empty_resource_request(self):
-        return not self.has_limits() and not self.has_requests()
-
-    def has_limits(self):
-        return self.limit_cpu is not None or self.limit_memory is not None
 
-    def has_requests(self):
-        return self.request_cpu is not None or self.request_memory is not None
 
 
 class Pod:


[04/16] incubator-airflow git commit: [AIRFLOW-1517] Add minikube for kubernetes integration tests

Posted by bo...@apache.org.
[AIRFLOW-1517] Add minikube for kubernetes integration tests

Add better support for minikube integration tests; By default minikube integration tests will run with kubernetes 1.7 and kubernetes 1.8


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cde3a5fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cde3a5fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cde3a5fe

Branch: refs/heads/master
Commit: cde3a5fecd825c90847bc563255707caafdd9336
Parents: 361dad9
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Wed Dec 27 14:21:20 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:28:32 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  9 +++
 airflow/contrib/kubernetes/kube_client.py       |  8 +-
 .../ci/kubernetes/minikube/start_minikube.sh    | 58 +++++++++++----
 scripts/ci/run_tests.sh                         |  2 +-
 scripts/ci/travis_script.sh                     |  7 +-
 tests/contrib/minikube_tests/__init__.py        | 13 ++++
 .../test_kubernetes_pod_operator.py             | 78 ++++++++++++++++++++
 .../operators/test_kubernetes_pod_operator.py   | 69 -----------------
 8 files changed, 150 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6b45153..dec9181 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,6 +54,8 @@ env:
     - TOX_ENV=py35-backend_sqlite
     - TOX_ENV=py35-backend_postgres
     - TOX_ENV=flake8
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
 matrix:
   exclude:
     - python: "3.5"
@@ -70,6 +72,13 @@ matrix:
       env: TOX_ENV=py35-backend_postgres
     - python: "2.7"
       env: TOX_ENV=flake8
+    - python: "3.5"  
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - python: "3.5"
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 
+  allow_failures:
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0  
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index 7dc895e..9db8641 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -13,10 +13,11 @@
 # limitations under the License.
 
 
-def load_kube_config(in_cluster=True):
+def _load_kube_config(in_cluster):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
+        return client.CoreV1Api()
     else:
         try:
             config.load_kube_config()
@@ -28,7 +29,4 @@ def load_kube_config(in_cluster=True):
 
 def get_kube_client(in_cluster=True):
     # TODO: This should also allow people to point to a cluster.
-
-    from kubernetes import client
-    load_kube_config(in_cluster)
-    return client.CoreV1Api()
+    return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index f78cb3a..1da23d0 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
 #  specific language governing permissions and limitations      *
 #  under the License.                                           *
 
-# Guard against a kubernetes cluster already being up
 #!/usr/bin/env bash
+# Guard against a kubernetes cluster already being up
 kubectl get pods &> /dev/null
 if [ $? -eq 0 ]; then
   echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin
 sudo mv minikube /usr/local/bin/minikube
@@ -39,15 +39,43 @@ mkdir $HOME/.kube || true
 touch $HOME/.kube/config
 
 export KUBECONFIG=$HOME/.kube/config
-sudo -E minikube start --vm-driver=none
-
-# this for loop waits until kubectl can access the api server that minikube has created
-for i in {1..150} # timeout for 5 minutes
-do
-  echo "------- Running kubectl get pods -------"
-  kubectl get po &> /dev/null
-  if [ $? -ne 1 ]; then
-    break
-  fi
-  sleep 2
-done
+
+start_minikube(){
+  sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
+
+  # this for loop waits until kubectl can access the api server that minikube has created
+  for i in {1..90} # timeout 3 minutes
+  do
+    echo "------- Running kubectl get pods -------"
+    STDERR=$(kubectl get pods  2>&1 >/dev/null)
+    if [ $? -ne 1 ]; then
+      echo $STDERR
+
+      # We do not need dynamic hostpath provisioning, so disable the default storageclass
+      sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
+
+      # We need to give permission to watch pods to the airflow scheduler. 
+      # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
+      kubectl create clusterrolebinding add-on-cluster-admin   --clusterrole=cluster-admin   --serviceaccount=default:default
+      exit 0
+    fi
+    echo $STDERR
+    sleep 2
+  done
+}
+
+cleanup_minikube(){
+  sudo -E minikube stop
+  sudo -E minikube delete
+  docker stop $(docker ps -a -q) || true
+  docker rm $(docker ps -a -q) || true
+  sleep 1
+}
+
+start_minikube
+echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
+cleanup_minikube
+start_minikube
+echo "Minikube cluster creation timedout a second time. Failing."
+
+exit 1

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 1253686..8c47ee8 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
 
 if [[ "$SKIP_TESTS" != "true" ]]; then
     echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
-    ./run_unit_tests.sh
+    ./run_unit_tests.sh $@
 fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index a51e742..86c086a 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,13 +19,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
 AIRFLOW_ROOT="$DIRNAME/../.."
 cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
 
-if [ -z "$RUN_KUBE_INTEGRATION" ];
+if [ -z "$KUBERNETES_VERSION" ];
 then
-  $DIRNAME/kubernetes/setup_kubernetes.sh
   tox -e $TOX_ENV
 else
-  $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.executors.integration \
+  KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/minikube_tests/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..18e614b
--- /dev/null
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call, CalledProcessError
+
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except CalledProcessError:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster; Skipping tests"
+    )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_working_pod(self):
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        k.execute(None)
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image=bad_image_name,
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task",
+                                  startup_timeout_seconds=5
+                                  )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=[bad_internal_command, "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        with self.assertRaises(AirflowException):
+            k.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
deleted file mode 100644
index 205f183..0000000
--- a/tests/contrib/operators/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
-    def test_working_pod(self):
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        k.execute(None)
-
-    def test_faulty_image(self):
-        bad_image_name = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image=bad_image_name,
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task",
-                                  startup_timeout_seconds=5
-                                  )
-        with self.assertRaises(AirflowException) as cm:
-            k.execute(None),
-
-        print("exception: {}".format(cm))
-
-    def test_pod_failure(self):
-        """
-            Tests that the task fails when a pod reports a failure
-        """
-
-        bad_internal_command = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=[bad_internal_command, "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        with self.assertRaises(AirflowException):
-            k.execute(None)


[03/16] incubator-airflow git commit: [AIRFLOW-1517] Restore authorship of secrets and init container

Posted by bo...@apache.org.
[AIRFLOW-1517] Restore authorship of secrets and init container


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/361dad95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/361dad95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/361dad95

Branch: refs/heads/master
Commit: 361dad957cca17eb37964c5945446bf62d74592f
Parents: c5ced07
Author: Benjamin Goldberg <be...@spothero.com>
Authored: Wed Dec 27 08:30:04 2017 -0600
Committer: Benjamin Goldberg <be...@spothero.com>
Committed: Wed Dec 27 08:44:39 2017 -0600

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               |  48 +++++++++
 .../pod_request_factory.py                      |   4 +
 airflow/contrib/kubernetes/pod.py               |   3 +
 airflow/contrib/kubernetes/pod_generator.py     | 105 +++++++++++++++++++
 airflow/contrib/kubernetes/secret.py            |  36 +++++++
 5 files changed, 196 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 88d3f32..9398bef 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -89,6 +89,37 @@ class KubernetesRequestFactory:
         req['metadata']['name'] = pod.name
 
     @staticmethod
+    def extract_volume_secrets(pod, req):
+        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+        if any(vol_secrets):
+            req['spec']['containers'][0]['volumeMounts'] = []
+            req['spec']['volumes'] = []
+        for idx, vol in enumerate(vol_secrets):
+            vol_id = 'secretvol' + str(idx)
+            req['spec']['containers'][0]['volumeMounts'].append({
+                'mountPath': vol.deploy_target,
+                'name': vol_id,
+                'readOnly': True
+            })
+            req['spec']['volumes'].append({
+                'name': vol_id,
+                'secret': {
+                    'secretName': vol.secret
+                }
+            })
+
+    @staticmethod
+    def extract_env_and_secrets(pod, req):
+        env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+        if len(pod.envs) > 0 or len(env_secrets) > 0:
+            env = []
+            for k in pod.envs.keys():
+                env.append({'name': k, 'value': pod.envs[k]})
+            for secret in env_secrets:
+                KubernetesRequestFactory.add_secret_to_env(env, secret)
+            req['spec']['containers'][0]['env'] = env
+
+    @staticmethod
     def extract_resources(pod, req):
         if not pod.resources or pod.resources.is_empty_resource_request():
             return
@@ -112,3 +143,20 @@ class KubernetesRequestFactory:
             if pod.resources.request_cpu:
                 req['spec']['containers'][0]['resources']['limits'][
                     'cpu'] = pod.resources.limit_cpu
+
+    @staticmethod
+    def extract_init_containers(pod, req):
+        if pod.init_containers:
+            req['spec']['initContainers'] = pod.init_containers
+
+    @staticmethod
+    def extract_service_account_name(pod, req):
+        if pod.service_account_name:
+            req['spec']['serviceAccountName'] = pod.service_account_name
+
+    @staticmethod
+    def extract_image_pull_secrets(pod, req):
+        if pod.image_pull_secrets:
+            req['spec']['imagePullSecrets'] = [{
+                'name': pull_secret
+            } for pull_secret in pod.image_pull_secrets.split(',')]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index ac9ded1..3be1a13 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -45,8 +45,12 @@ spec:
         self.extract_cmds(pod, req)
         self.extract_args(pod, req)
         self.extract_node_selector(pod, req)
+        self.extract_env_and_secrets(pod, req)
         self.extract_volume_secrets(pod, req)
         self.attach_volumes(pod, req)
         self.attach_volume_mounts(pod, req)
         self.extract_resources(pod, req)
+        self.extract_service_account_name(pod, req)
+        self.extract_init_containers(pod, req)
+        self.extract_image_pull_secrets(pod, req)
         return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index f6a0583..6a9f76d 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -86,4 +86,7 @@ class Pod:
         self.node_selectors = node_selectors or []
         self.namespace = namespace
         self.image_pull_policy = image_pull_policy
+        self.image_pull_secrets = image_pull_secrets
+        self.init_containers = init_containers
+        self.service_account_name = service_account_name
         self.resources = resources or Resources()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 69c6fcb..685be37 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -29,6 +29,39 @@ class PodGenerator:
         self.init_containers = []
         self.secrets = []
 
+    def add_init_container(self,
+                           name,
+                           image,
+                           securityContext,
+                           init_environment,
+                           volume_mounts
+                           ):
+        """
+
+        Adds an init container to the launched pod. useful for pre-
+
+        Args:
+            name (str):
+            image (str):
+            securityContext (dict):
+            init_environment (dict):
+            volume_mounts (dict):
+
+        Returns:
+
+        """
+        self.init_containers.append(
+            {
+                'name': name,
+                'image': image,
+                'securityContext': securityContext,
+                'env': init_environment,
+                'volumeMounts': volume_mounts
+            }
+        )
+
+    def _get_init_containers(self):
+        return self.init_containers
 
     def add_volume(self, name):
         """
@@ -76,6 +109,12 @@ class PodGenerator:
     def _get_volumes_and_mounts(self):
         return self.volumes, self.volume_mounts
 
+    def _get_image_pull_secrets(self):
+        """Extracts any image pull secrets for fetching container(s)"""
+        if not self.kube_config.image_pull_secrets:
+            return []
+        return self.kube_config.image_pull_secrets.split(',')
+
     def make_pod(self, namespace, image, pod_id, cmds,
                  arguments, labels, kube_executor_config=None):
         volumes, volume_mounts = self._get_volumes_and_mounts()
@@ -97,6 +136,9 @@ class PodGenerator:
             labels=labels,
             envs=self.env_vars,
             secrets={},
+            # service_account_name=self.kube_config.worker_service_account_name,
+            # image_pull_secrets=self.kube_config.image_pull_secrets,
+            init_containers=worker_init_container_spec,
             volumes=volumes,
             volume_mounts=volume_mounts,
             resources=None
@@ -128,12 +170,32 @@ class WorkerGenerator(PodGenerator):
             'readOnly': True
         }]
 
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = "airflow-config"
+            config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+            volumes.append({
+                'name': config_volume_name,
+                'configMap': {
+                    'name': self.kube_config.airflow_configmap
+                }
+            })
+            volume_mounts.append({
+                'name': config_volume_name,
+                'mountPath': config_path,
+                'subPath': 'airflow.cfg',
+                'readOnly': True
+            })
+
         # A PV with the DAGs should be mounted
         if self.kube_config.dags_volume_claim:
             volumes[0]['persistentVolumeClaim'] = {
                 "claimName": self.kube_config.dags_volume_claim}
             if self.kube_config.dags_volume_subpath:
                 volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
+        else:
+            # Create a Shared Volume for the Git-Sync module to populate
+            volumes[0]["emptyDir"] = {}
         return volumes, volume_mounts
 
     def _init_labels(self, dag_id, task_id, execution_date):
@@ -154,6 +216,49 @@ class WorkerGenerator(PodGenerator):
             env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
         return env
 
+    def _init_init_containers(self, volume_mounts):
+        """When using git to retrieve the DAGs, use the GitSync Init Container"""
+        # If we're using volume claims to mount the dags, no init container is needed
+        if self.kube_config.dags_volume_claim:
+            return []
+
+        # Otherwise, define a git-sync init container
+        init_environment = [{
+            'name': 'GIT_SYNC_REPO',
+            'value': self.kube_config.git_repo
+        }, {
+            'name': 'GIT_SYNC_BRANCH',
+            'value': self.kube_config.git_branch
+        }, {
+            'name': 'GIT_SYNC_ROOT',
+            'value': '/tmp'
+        }, {
+            'name': 'GIT_SYNC_DEST',
+            'value': 'dags'
+        }, {
+            'name': 'GIT_SYNC_ONE_TIME',
+            'value': 'true'
+        }]
+        if self.kube_config.git_user:
+            init_environment.append({
+                'name': 'GIT_SYNC_USERNAME',
+                'value': self.kube_config.git_user
+            })
+        if self.kube_config.git_password:
+            init_environment.append({
+                'name': 'GIT_SYNC_PASSWORD',
+                'value': self.kube_config.git_password
+            })
+
+        volume_mounts[0]['readOnly'] = False
+        return [{
+            'name': self.kube_config.git_sync_init_container_name,
+            'image': self.kube_config.git_sync_container,
+            'securityContext': {'runAsUser': 0},
+            'env': init_environment,
+            'volumeMounts': volume_mounts
+        }]
+
     def make_worker_pod(self,
                         namespace,
                         pod_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index e69de29..15f070e 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Secret:
+    """Defines Kubernetes Secret Volume"""
+
+    def __init__(self, deploy_type, deploy_target, secret, key):
+        """Initialize a Kubernetes Secret Object. Used to track requested secrets from
+        the user.
+
+        :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
+            `volume`
+        :type deploy_type: ``str``
+        :param deploy_target: The environment variable to be created in the worker.
+        :type deploy_target: ``str``
+        :param secret: Name of the secrets object in Kubernetes
+        :type secret: ``str``
+        :param key: Key of the secret within the Kubernetes Secret
+        :type key: ``str``
+        """
+        self.deploy_type = deploy_type
+        self.deploy_target = deploy_target.upper()
+        self.secret = secret
+        self.key = key


[13/16] incubator-airflow git commit: [AIRFLOW-1517] Restore authorship of resources

Posted by bo...@apache.org.
[AIRFLOW-1517] Restore authorship of resources

Collaboration authors got destroyed when splitting up a PR, this commit adds back in the code which was be removed in the previous commit to restore authorship


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/28d9d7f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/28d9d7f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/28d9d7f0

Branch: refs/heads/master
Commit: 28d9d7f00f3789e02702db969c1b19fdc16ef968
Parents: 540b724
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:39:48 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:17 2018 -0800

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               | 33 ++++++++++++++++++--
 airflow/contrib/kubernetes/pod.py               | 19 +++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28d9d7f0/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 0324781..cbf3fce 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -40,6 +40,10 @@ class KubernetesRequestFactory:
     def extract_image(pod, req):
         req['spec']['containers'][0]['image'] = pod.image
 
+    @staticmethod
+    def extract_image_pull_policy(pod, req):
+        if pod.image_pull_policy:
+            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
 
     @staticmethod
     def add_secret_to_env(env, secret):
@@ -72,7 +76,9 @@ class KubernetesRequestFactory:
         if len(pod.node_selectors) > 0:
             req['spec']['nodeSelector'] = pod.node_selectors
 
-
+    @staticmethod
+    def attach_volumes(pod, req):
+        req['spec']['volumes'] = pod.volumes
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -116,7 +122,30 @@ class KubernetesRequestFactory:
                 KubernetesRequestFactory.add_secret_to_env(env, secret)
             req['spec']['containers'][0]['env'] = env
 
-
+    @staticmethod
+    def extract_resources(pod, req):
+        if not pod.resources or pod.resources.is_empty_resource_request():
+            return
+
+        req['spec']['containers'][0]['resources'] = {}
+
+        if pod.resources.has_requests():
+            req['spec']['containers'][0]['resources']['requests'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'memory'] = pod.resources.request_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'cpu'] = pod.resources.request_cpu
+
+        if pod.resources.has_limits():
+            req['spec']['containers'][0]['resources']['limits'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'memory'] = pod.resources.limit_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'cpu'] = pod.resources.limit_cpu
 
     @staticmethod
     def extract_init_containers(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28d9d7f0/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index ba6ac06..d80b626 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -16,7 +16,26 @@
 # under the License.
 
 
+class Resources:
+    def __init__(
+            self,
+            request_memory=None,
+            request_cpu=None,
+            limit_memory=None,
+            limit_cpu=None):
+        self.request_memory = request_memory
+        self.request_cpu = request_cpu
+        self.limit_memory = limit_memory
+        self.limit_cpu = limit_cpu
+
+    def is_empty_resource_request(self):
+        return not self.has_limits() and not self.has_requests()
+
+    def has_limits(self):
+        return self.limit_cpu is not None or self.limit_memory is not None
 
+    def has_requests(self):
+        return self.request_cpu is not None or self.request_memory is not None
 
 
 class Pod:


[10/16] incubator-airflow git commit: Revert "[AIRFLOW-1517] Add minikube for kubernetes integration tests"

Posted by bo...@apache.org.
Revert "[AIRFLOW-1517] Add minikube for kubernetes integration tests"

This reverts commit 0197931609685a98181387014f7c8f3b5cd5f9a8.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a42dbb4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a42dbb4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a42dbb4f

Branch: refs/heads/master
Commit: a42dbb4f4d95abcbf91b50a9c408db6fa315daed
Parents: 7c9e3c1
Author: Daniel Imberman <da...@gmail.com>
Authored: Fri Dec 29 13:23:27 2017 -0800
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  9 ---
 airflow/contrib/kubernetes/kube_client.py       |  8 +-
 .../ci/kubernetes/minikube/start_minikube.sh    | 58 ++++-----------
 scripts/ci/run_tests.sh                         |  2 +-
 scripts/ci/travis_script.sh                     |  7 +-
 tests/contrib/minikube_tests/__init__.py        | 13 ----
 .../test_kubernetes_pod_operator.py             | 78 --------------------
 .../operators/test_kubernetes_pod_operator.py   | 69 +++++++++++++++++
 8 files changed, 94 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index dec9181..6b45153 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,8 +54,6 @@ env:
     - TOX_ENV=py35-backend_sqlite
     - TOX_ENV=py35-backend_postgres
     - TOX_ENV=flake8
-    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
-    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
 matrix:
   exclude:
     - python: "3.5"
@@ -72,13 +70,6 @@ matrix:
       env: TOX_ENV=py35-backend_postgres
     - python: "2.7"
       env: TOX_ENV=flake8
-    - python: "3.5"  
-      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
-    - python: "3.5"
-      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 
-  allow_failures:
-    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
-    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0  
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index ecb3d55..cd68caf 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -16,15 +16,17 @@
 # under the License.
 
 
-def _load_kube_config(in_cluster):
+def load_kube_config(in_cluster=True):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
-        return client.CoreV1Api()
     else:
         config.load_kube_config()
         return client.CoreV1Api()
 
 def get_kube_client(in_cluster=True):
     # TODO: This should also allow people to point to a cluster.
-    return _load_kube_config(in_cluster)
+
+    from kubernetes import client
+    load_kube_config(in_cluster)
+    return client.CoreV1Api()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 1da23d0..f78cb3a 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
 #  specific language governing permissions and limitations      *
 #  under the License.                                           *
 
-#!/usr/bin/env bash
 # Guard against a kubernetes cluster already being up
+#!/usr/bin/env bash
 kubectl get pods &> /dev/null
 if [ $? -eq 0 ]; then
   echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin
 sudo mv minikube /usr/local/bin/minikube
@@ -39,43 +39,15 @@ mkdir $HOME/.kube || true
 touch $HOME/.kube/config
 
 export KUBECONFIG=$HOME/.kube/config
-
-start_minikube(){
-  sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
-
-  # this for loop waits until kubectl can access the api server that minikube has created
-  for i in {1..90} # timeout 3 minutes
-  do
-    echo "------- Running kubectl get pods -------"
-    STDERR=$(kubectl get pods  2>&1 >/dev/null)
-    if [ $? -ne 1 ]; then
-      echo $STDERR
-
-      # We do not need dynamic hostpath provisioning, so disable the default storageclass
-      sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
-
-      # We need to give permission to watch pods to the airflow scheduler. 
-      # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
-      kubectl create clusterrolebinding add-on-cluster-admin   --clusterrole=cluster-admin   --serviceaccount=default:default
-      exit 0
-    fi
-    echo $STDERR
-    sleep 2
-  done
-}
-
-cleanup_minikube(){
-  sudo -E minikube stop
-  sudo -E minikube delete
-  docker stop $(docker ps -a -q) || true
-  docker rm $(docker ps -a -q) || true
-  sleep 1
-}
-
-start_minikube
-echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
-cleanup_minikube
-start_minikube
-echo "Minikube cluster creation timedout a second time. Failing."
-
-exit 1
+sudo -E minikube start --vm-driver=none
+
+# this for loop waits until kubectl can access the api server that minikube has created
+for i in {1..150} # timeout for 5 minutes
+do
+  echo "------- Running kubectl get pods -------"
+  kubectl get po &> /dev/null
+  if [ $? -ne 1 ]; then
+    break
+  fi
+  sleep 2
+done

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 8c47ee8..1253686 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
 
 if [[ "$SKIP_TESTS" != "true" ]]; then
     echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
-    ./run_unit_tests.sh $@
+    ./run_unit_tests.sh
 fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 86c086a..a51e742 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,12 +19,13 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
 AIRFLOW_ROOT="$DIRNAME/../.."
 cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
 
-if [ -z "$KUBERNETES_VERSION" ];
+if [ -z "$RUN_KUBE_INTEGRATION" ];
 then
+  $DIRNAME/kubernetes/setup_kubernetes.sh
   tox -e $TOX_ENV
 else
-  KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+  $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.executors.integration \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/minikube_tests/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
deleted file mode 100644
index 18e614b..0000000
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-from subprocess import check_call, CalledProcessError
-
-
-try:
-    check_call(["kubectl", "get", "pods"])
-except CalledProcessError:
-    raise unittest.SkipTest(
-        "Kubernetes integration tests require a minikube cluster; Skipping tests"
-    )
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
-    def test_working_pod(self):
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        k.execute(None)
-
-    def test_faulty_image(self):
-        bad_image_name = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image=bad_image_name,
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task",
-                                  startup_timeout_seconds=5
-                                  )
-        with self.assertRaises(AirflowException) as cm:
-            k.execute(None),
-
-        print("exception: {}".format(cm))
-
-    def test_pod_failure(self):
-        """
-            Tests that the task fails when a pod reports a failure
-        """
-
-        bad_internal_command = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=[bad_internal_command, "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        with self.assertRaises(AirflowException):
-            k.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a42dbb4f/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..205f183
--- /dev/null
+++ b/tests/contrib/operators/test_kubernetes_pod_operator.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_working_pod(self):
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        k.execute(None)
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image=bad_image_name,
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task",
+                                  startup_timeout_seconds=5
+                                  )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=[bad_internal_command, "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        with self.assertRaises(AirflowException):
+            k.execute(None)


[02/16] incubator-airflow git commit: [AIRFLOW-1517] Remove authorship of secrets and init container

Posted by bo...@apache.org.
[AIRFLOW-1517] Remove authorship of secrets and init container


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c5ced072
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c5ced072
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c5ced072

Branch: refs/heads/master
Commit: c5ced072b2199b3506abb4e4680e828f91066aaf
Parents: 78ff2fc
Author: Benjamin Goldberg <be...@spothero.com>
Authored: Wed Dec 27 08:27:48 2017 -0600
Committer: Benjamin Goldberg <be...@spothero.com>
Committed: Wed Dec 27 08:28:58 2017 -0600

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               |  48 ---------
 .../pod_request_factory.py                      |   4 -
 airflow/contrib/kubernetes/pod.py               |   3 -
 airflow/contrib/kubernetes/pod_generator.py     | 105 -------------------
 airflow/contrib/kubernetes/secret.py            |  36 -------
 5 files changed, 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 9398bef..88d3f32 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -89,37 +89,6 @@ class KubernetesRequestFactory:
         req['metadata']['name'] = pod.name
 
     @staticmethod
-    def extract_volume_secrets(pod, req):
-        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
-        if any(vol_secrets):
-            req['spec']['containers'][0]['volumeMounts'] = []
-            req['spec']['volumes'] = []
-        for idx, vol in enumerate(vol_secrets):
-            vol_id = 'secretvol' + str(idx)
-            req['spec']['containers'][0]['volumeMounts'].append({
-                'mountPath': vol.deploy_target,
-                'name': vol_id,
-                'readOnly': True
-            })
-            req['spec']['volumes'].append({
-                'name': vol_id,
-                'secret': {
-                    'secretName': vol.secret
-                }
-            })
-
-    @staticmethod
-    def extract_env_and_secrets(pod, req):
-        env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
-        if len(pod.envs) > 0 or len(env_secrets) > 0:
-            env = []
-            for k in pod.envs.keys():
-                env.append({'name': k, 'value': pod.envs[k]})
-            for secret in env_secrets:
-                KubernetesRequestFactory.add_secret_to_env(env, secret)
-            req['spec']['containers'][0]['env'] = env
-
-    @staticmethod
     def extract_resources(pod, req):
         if not pod.resources or pod.resources.is_empty_resource_request():
             return
@@ -143,20 +112,3 @@ class KubernetesRequestFactory:
             if pod.resources.request_cpu:
                 req['spec']['containers'][0]['resources']['limits'][
                     'cpu'] = pod.resources.limit_cpu
-
-    @staticmethod
-    def extract_init_containers(pod, req):
-        if pod.init_containers:
-            req['spec']['initContainers'] = pod.init_containers
-
-    @staticmethod
-    def extract_service_account_name(pod, req):
-        if pod.service_account_name:
-            req['spec']['serviceAccountName'] = pod.service_account_name
-
-    @staticmethod
-    def extract_image_pull_secrets(pod, req):
-        if pod.image_pull_secrets:
-            req['spec']['imagePullSecrets'] = [{
-                'name': pull_secret
-            } for pull_secret in pod.image_pull_secrets.split(',')]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 3be1a13..ac9ded1 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -45,12 +45,8 @@ spec:
         self.extract_cmds(pod, req)
         self.extract_args(pod, req)
         self.extract_node_selector(pod, req)
-        self.extract_env_and_secrets(pod, req)
         self.extract_volume_secrets(pod, req)
         self.attach_volumes(pod, req)
         self.attach_volume_mounts(pod, req)
         self.extract_resources(pod, req)
-        self.extract_service_account_name(pod, req)
-        self.extract_init_containers(pod, req)
-        self.extract_image_pull_secrets(pod, req)
         return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 6a9f76d..f6a0583 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -86,7 +86,4 @@ class Pod:
         self.node_selectors = node_selectors or []
         self.namespace = namespace
         self.image_pull_policy = image_pull_policy
-        self.image_pull_secrets = image_pull_secrets
-        self.init_containers = init_containers
-        self.service_account_name = service_account_name
         self.resources = resources or Resources()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 685be37..69c6fcb 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -29,39 +29,6 @@ class PodGenerator:
         self.init_containers = []
         self.secrets = []
 
-    def add_init_container(self,
-                           name,
-                           image,
-                           securityContext,
-                           init_environment,
-                           volume_mounts
-                           ):
-        """
-
-        Adds an init container to the launched pod. useful for pre-
-
-        Args:
-            name (str):
-            image (str):
-            securityContext (dict):
-            init_environment (dict):
-            volume_mounts (dict):
-
-        Returns:
-
-        """
-        self.init_containers.append(
-            {
-                'name': name,
-                'image': image,
-                'securityContext': securityContext,
-                'env': init_environment,
-                'volumeMounts': volume_mounts
-            }
-        )
-
-    def _get_init_containers(self):
-        return self.init_containers
 
     def add_volume(self, name):
         """
@@ -109,12 +76,6 @@ class PodGenerator:
     def _get_volumes_and_mounts(self):
         return self.volumes, self.volume_mounts
 
-    def _get_image_pull_secrets(self):
-        """Extracts any image pull secrets for fetching container(s)"""
-        if not self.kube_config.image_pull_secrets:
-            return []
-        return self.kube_config.image_pull_secrets.split(',')
-
     def make_pod(self, namespace, image, pod_id, cmds,
                  arguments, labels, kube_executor_config=None):
         volumes, volume_mounts = self._get_volumes_and_mounts()
@@ -136,9 +97,6 @@ class PodGenerator:
             labels=labels,
             envs=self.env_vars,
             secrets={},
-            # service_account_name=self.kube_config.worker_service_account_name,
-            # image_pull_secrets=self.kube_config.image_pull_secrets,
-            init_containers=worker_init_container_spec,
             volumes=volumes,
             volume_mounts=volume_mounts,
             resources=None
@@ -170,32 +128,12 @@ class WorkerGenerator(PodGenerator):
             'readOnly': True
         }]
 
-        # Mount the airflow.cfg file via a configmap the user has specified
-        if self.kube_config.airflow_configmap:
-            config_volume_name = "airflow-config"
-            config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
-            volumes.append({
-                'name': config_volume_name,
-                'configMap': {
-                    'name': self.kube_config.airflow_configmap
-                }
-            })
-            volume_mounts.append({
-                'name': config_volume_name,
-                'mountPath': config_path,
-                'subPath': 'airflow.cfg',
-                'readOnly': True
-            })
-
         # A PV with the DAGs should be mounted
         if self.kube_config.dags_volume_claim:
             volumes[0]['persistentVolumeClaim'] = {
                 "claimName": self.kube_config.dags_volume_claim}
             if self.kube_config.dags_volume_subpath:
                 volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
-        else:
-            # Create a Shared Volume for the Git-Sync module to populate
-            volumes[0]["emptyDir"] = {}
         return volumes, volume_mounts
 
     def _init_labels(self, dag_id, task_id, execution_date):
@@ -216,49 +154,6 @@ class WorkerGenerator(PodGenerator):
             env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
         return env
 
-    def _init_init_containers(self, volume_mounts):
-        """When using git to retrieve the DAGs, use the GitSync Init Container"""
-        # If we're using volume claims to mount the dags, no init container is needed
-        if self.kube_config.dags_volume_claim:
-            return []
-
-        # Otherwise, define a git-sync init container
-        init_environment = [{
-            'name': 'GIT_SYNC_REPO',
-            'value': self.kube_config.git_repo
-        }, {
-            'name': 'GIT_SYNC_BRANCH',
-            'value': self.kube_config.git_branch
-        }, {
-            'name': 'GIT_SYNC_ROOT',
-            'value': '/tmp'
-        }, {
-            'name': 'GIT_SYNC_DEST',
-            'value': 'dags'
-        }, {
-            'name': 'GIT_SYNC_ONE_TIME',
-            'value': 'true'
-        }]
-        if self.kube_config.git_user:
-            init_environment.append({
-                'name': 'GIT_SYNC_USERNAME',
-                'value': self.kube_config.git_user
-            })
-        if self.kube_config.git_password:
-            init_environment.append({
-                'name': 'GIT_SYNC_PASSWORD',
-                'value': self.kube_config.git_password
-            })
-
-        volume_mounts[0]['readOnly'] = False
-        return [{
-            'name': self.kube_config.git_sync_init_container_name,
-            'image': self.kube_config.git_sync_container,
-            'securityContext': {'runAsUser': 0},
-            'env': init_environment,
-            'volumeMounts': volume_mounts
-        }]
-
     def make_worker_pod(self,
                         namespace,
                         pod_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c5ced072/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py
index 15f070e..e69de29 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -1,36 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class Secret:
-    """Defines Kubernetes Secret Volume"""
-
-    def __init__(self, deploy_type, deploy_target, secret, key):
-        """Initialize a Kubernetes Secret Object. Used to track requested secrets from
-        the user.
-
-        :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
-            `volume`
-        :type deploy_type: ``str``
-        :param deploy_target: The environment variable to be created in the worker.
-        :type deploy_target: ``str``
-        :param secret: Name of the secrets object in Kubernetes
-        :type secret: ``str``
-        :param key: Key of the secret within the Kubernetes Secret
-        :type key: ``str``
-        """
-        self.deploy_type = deploy_type
-        self.deploy_target = deploy_target.upper()
-        self.secret = secret
-        self.key = key


[06/16] incubator-airflow git commit: [AIRFLOW-1517] Remove authorship of resources

Posted by bo...@apache.org.
[AIRFLOW-1517] Remove authorship of resources

Collaboration authors got destroyed when splitting up a PR, this commit removes code which will be readded in the next commit to restore authorship


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ada7aed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ada7aed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ada7aed0

Branch: refs/heads/master
Commit: ada7aed0da1e84e2fc7b217295d6160a6764f787
Parents: cde3a5f
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Thu Dec 28 14:38:28 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:28:33 2018 -0800

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               | 33 ++------------------
 airflow/contrib/kubernetes/pod.py               | 19 -----------
 2 files changed, 2 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ada7aed0/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 9398bef..7d698b4 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -37,10 +37,6 @@ class KubernetesRequestFactory:
     def extract_image(pod, req):
         req['spec']['containers'][0]['image'] = pod.image
 
-    @staticmethod
-    def extract_image_pull_policy(pod, req):
-        if pod.image_pull_policy:
-            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
 
     @staticmethod
     def add_secret_to_env(env, secret):
@@ -73,9 +69,7 @@ class KubernetesRequestFactory:
         if len(pod.node_selectors) > 0:
             req['spec']['nodeSelector'] = pod.node_selectors
 
-    @staticmethod
-    def attach_volumes(pod, req):
-        req['spec']['volumes'] = pod.volumes
+
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -119,30 +113,7 @@ class KubernetesRequestFactory:
                 KubernetesRequestFactory.add_secret_to_env(env, secret)
             req['spec']['containers'][0]['env'] = env
 
-    @staticmethod
-    def extract_resources(pod, req):
-        if not pod.resources or pod.resources.is_empty_resource_request():
-            return
-
-        req['spec']['containers'][0]['resources'] = {}
-
-        if pod.resources.has_requests():
-            req['spec']['containers'][0]['resources']['requests'] = {}
-            if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'memory'] = pod.resources.request_memory
-            if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'cpu'] = pod.resources.request_cpu
-
-        if pod.resources.has_limits():
-            req['spec']['containers'][0]['resources']['limits'] = {}
-            if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'memory'] = pod.resources.limit_memory
-            if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'cpu'] = pod.resources.limit_cpu
+
 
     @staticmethod
     def extract_init_containers(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ada7aed0/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 6a9f76d..cdb1d65 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -14,26 +14,7 @@
 # limitations under the License.
 
 
-class Resources:
-    def __init__(
-            self,
-            request_memory=None,
-            request_cpu=None,
-            limit_memory=None,
-            limit_cpu=None):
-        self.request_memory = request_memory
-        self.request_cpu = request_cpu
-        self.limit_memory = limit_memory
-        self.limit_cpu = limit_cpu
-
-    def is_empty_resource_request(self):
-        return not self.has_limits() and not self.has_requests()
-
-    def has_limits(self):
-        return self.limit_cpu is not None or self.limit_memory is not None
 
-    def has_requests(self):
-        return self.request_cpu is not None or self.request_memory is not None
 
 
 class Pod: