You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 07:02:46 UTC

[50/50] incubator-airflow git commit: [AIRFLOW-1899] Fix Kubernetes tests

[AIRFLOW-1899] Fix Kubernetes tests

[AIRFLOW-1899] Add full deployment

- Made home directory configurable
- Documentation fix
- Add licenses

[AIRFLOW-1899] Tests for the Kubernetes Executor

Add an integration test for the Kubernetes
executor. Done by
spinning up different versions of kubernetes and
run a DAG
by invoking the REST API

Closes #3301 from Fokko/fix-kubernetes-executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16bae563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16bae563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16bae563

Branch: refs/heads/v1-10-test
Commit: 16bae5634df24132b37eb752fe816f51bf7e83ca
Parents: d1f7af3
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri May 4 08:58:12 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri May 4 08:58:12 2018 +0200

----------------------------------------------------------------------
 .gitignore                                      |   1 -
 .travis.yml                                     |   3 -
 airflow/config_templates/default_airflow.cfg    |   8 +
 airflow/configuration.py                        |   9 +-
 .../contrib/executors/kubernetes_executor.py    |  80 ++---
 airflow/contrib/kubernetes/kube_client.py       |   4 +-
 .../contrib/kubernetes/worker_configuration.py  |  19 +-
 airflow/jobs.py                                 |   4 +-
 airflow/www_rbac/api/experimental/endpoints.py  |  27 +-
 scripts/ci/kubernetes/docker/Dockerfile         |   5 +
 scripts/ci/kubernetes/docker/airflow-init.sh    |  24 ++
 scripts/ci/kubernetes/docker/build.sh           |  11 +-
 scripts/ci/kubernetes/kube/airflow.yaml         | 102 +-----
 scripts/ci/kubernetes/kube/configmaps.yaml      | 359 +++++++++++++++++++
 scripts/ci/kubernetes/kube/deploy.sh            |   6 +
 scripts/ci/kubernetes/kube/postgres.yaml        |   1 +
 scripts/ci/kubernetes/kube/secrets.yaml         |  25 ++
 scripts/ci/kubernetes/kube/volumes.yaml         |   1 +
 scripts/ci/travis_script.sh                     |   2 +-
 setup.cfg                                       |  13 +-
 tests/cli/test_cli.py                           | 101 +++++-
 tests/contrib/kubernetes/__init__.py            |  14 -
 tests/contrib/minikube/__init__.py              |  18 +
 .../minikube/test_kubernetes_executor.py        |  97 +++++
 .../minikube/test_kubernetes_pod_operator.py    |  98 +++++
 tests/contrib/minikube_tests/__init__.py        |  18 -
 .../minikube_tests/integration/__init__.py      |  13 -
 .../integration/airflow_controller.py           | 166 ---------
 .../test_kubernetes_executor_integration.py     |  67 ----
 .../test_kubernetes_pod_operator.py             |  93 -----
 .../www_rbac/api/experimental/test_endpoints.py |  21 +-
 31 files changed, 866 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 04c408f..a42962f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -137,4 +137,3 @@ rat-results.txt
 *.generated
 *.tar.gz
 scripts/ci/kubernetes/kube/.generated/airflow.yaml
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6d29a7a..77033df 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -80,9 +80,6 @@ matrix:
       env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
     - python: "2.7"
       env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
-  allow_failures:
-    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
-    - env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index fa5eea0..6da4287 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -502,6 +504,7 @@ hide_sensitive_variable_fields = True
 
 [elasticsearch]
 elasticsearch_host =
+
 [kubernetes]
 # The repository and tag of the Kubernetes Image for the Worker to Run
 worker_container_repository =
@@ -550,6 +553,11 @@ image_pull_secrets =
 # Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
 gcp_service_account_keys =
 
+# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
+# It’s intended for clients that expect to be running inside a pod running on kubernetes.
+# It will raise an exception if called from a process not running in a kubernetes environment.
+in_cluster = True
+
 [kubernetes_secrets]
 # The scheduler mounts the following secrets into your workers as they are launched by the
 # scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 130356c..20ef067 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -442,7 +442,10 @@ if not os.path.isfile(AIRFLOW_CONFIG):
     )
     with open(AIRFLOW_CONFIG, 'w') as f:
         cfg = parameterized_config(DEFAULT_CONFIG)
-        f.write(cfg.split(TEMPLATE_START)[-1].strip())
+        cfg = cfg.split(TEMPLATE_START)[-1].strip()
+        if six.PY2:
+            cfg = cfg.encode('utf8')
+        f.write(cfg)
 
 log.info("Reading the config from %s", AIRFLOW_CONFIG)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index cdce95f..17b2908 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -23,6 +23,7 @@ from uuid import uuid4
 import kubernetes
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
+from airflow.configuration import conf
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 from airflow.contrib.kubernetes.kube_client import get_kube_client
 from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
@@ -87,20 +88,6 @@ class KubeConfig:
     core_section = 'core'
     kubernetes_section = 'kubernetes'
 
-    @staticmethod
-    def safe_get(section, option, default):
-        try:
-            return configuration.get(section, option)
-        except AirflowConfigException:
-            return default
-
-    @staticmethod
-    def safe_getboolean(section, option, default):
-        try:
-            return configuration.getboolean(section, option)
-        except AirflowConfigException:
-            return default
-
     def __init__(self):
         configuration_dict = configuration.as_dict(display_sensitive=True)
         self.core_configuration = configuration_dict['core']
@@ -114,40 +101,37 @@ class KubeConfig:
             self.kubernetes_section, 'worker_container_tag')
         self.kube_image = '{}:{}'.format(
             self.worker_container_repository, self.worker_container_tag)
-        self.delete_worker_pods = self.safe_getboolean(
-            self.kubernetes_section, 'delete_worker_pods', True)
+        self.delete_worker_pods = conf.getboolean(
+            self.kubernetes_section, 'delete_worker_pods')
 
-        self.worker_service_account_name = self.safe_get(
-            self.kubernetes_section, 'worker_service_account_name', 'default')
-        self.image_pull_secrets = self.safe_get(
-            self.kubernetes_section, 'image_pull_secrets', '')
+        self.worker_service_account_name = conf.get(
+            self.kubernetes_section, 'worker_service_account_name')
+        self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
 
         # NOTE: `git_repo` and `git_branch` must be specified together as a pair
         # The http URL of the git repository to clone from
-        self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
+        self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
         # The branch of the repository to be checked out
-        self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
+        self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
         # Optionally, the directory in the git repository containing the dags
-        self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '')
+        self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')
 
         # Optionally a user may supply a `git_user` and `git_password` for private
         # repositories
-        self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None)
-        self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None)
+        self.git_user = conf.get(self.kubernetes_section, 'git_user')
+        self.git_password = conf.get(self.kubernetes_section, 'git_password')
 
         # NOTE: The user may optionally use a volume claim to mount a PV containing
         # DAGs directly
-        self.dags_volume_claim = self.safe_get(self.kubernetes_section,
-                                               'dags_volume_claim', None)
+        self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')
 
         # This prop may optionally be set for PV Claims and is used to write logs
-        self.logs_volume_claim = self.safe_get(
-            self.kubernetes_section, 'logs_volume_claim', None)
+        self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')
 
         # This prop may optionally be set for PV Claims and is used to locate DAGs
         # on a SubPath
-        self.dags_volume_subpath = self.safe_get(
-            self.kubernetes_section, 'dags_volume_subpath', None)
+        self.dags_volume_subpath = conf.get(
+            self.kubernetes_section, 'dags_volume_subpath')
 
         # This prop may optionally be set for PV Claims and is used to write logs
         self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
@@ -156,36 +140,32 @@ class KubeConfig:
         # that if your
         # cluster has RBAC enabled, your scheduler may need service account permissions to
         # create, watch, get, and delete pods in this namespace.
-        self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace',
-                                            'default')
+        self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
         # The Kubernetes Namespace in which pods will be created by the executor. Note
         # that if your
         # cluster has RBAC enabled, your workers may need service account permissions to
         # interact with cluster components.
-        self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace',
-                                                'default')
+        self.executor_namespace = conf.get(self.kubernetes_section, 'namespace')
         # Task secrets managed by KubernetesExecutor.
-        self.gcp_service_account_keys = self.safe_get(
-            self.kubernetes_section, 'gcp_service_account_keys', None)
+        self.gcp_service_account_keys = conf.get(self.kubernetes_section,
+                                                 'gcp_service_account_keys')
 
         # If the user is using the git-sync container to clone their repository via git,
         # allow them to specify repository, tag, and pod name for the init container.
-        self.git_sync_container_repository = self.safe_get(
-            self.kubernetes_section, 'git_sync_container_repository',
-            'gcr.io/google-containers/git-sync-amd64')
+        self.git_sync_container_repository = conf.get(
+            self.kubernetes_section, 'git_sync_container_repository')
 
-        self.git_sync_container_tag = self.safe_get(
-            self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
+        self.git_sync_container_tag = conf.get(
+            self.kubernetes_section, 'git_sync_container_tag')
         self.git_sync_container = '{}:{}'.format(
             self.git_sync_container_repository, self.git_sync_container_tag)
 
-        self.git_sync_init_container_name = self.safe_get(
-            self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone')
+        self.git_sync_init_container_name = conf.get(
+            self.kubernetes_section, 'git_sync_init_container_name')
 
         # The worker pod may optionally have a  valid Airflow config loaded via a
         # configmap
-        self.airflow_configmap = self.safe_get(self.kubernetes_section,
-                                               'airflow_configmap', None)
+        self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')
 
         self._validate()
 
@@ -272,7 +252,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
             self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
         elif status == 'Succeeded':
             self.log.info('Event: %s Succeeded', pod_id)
-            self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version))
+            self.watcher_queue.put((pod_id, None, labels, resource_version))
         elif status == 'Running':
             self.log.info('Event: %s is Running', pod_id)
         else:
@@ -552,7 +532,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
         # always need to reset resource version since we don't know
         # when we last started, note for behavior below
-        # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod
+        # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
+        # /CoreV1Api.md#list_namespaced_pod
         KubeResourceVersion.reset_resource_version(self._session)
         self.task_queue = Queue()
         self.result_queue = Queue()
@@ -610,8 +591,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             task_id=task_id,
             execution_date=ex_time
         ).one()
-
-        if item.state == State.RUNNING or item.state == State.QUEUED:
+        if state:
             item.state = state
             self._session.add(item)
             self._session.commit()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index d1a63a2..1d3cc9d 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -14,7 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+from airflow.configuration import conf
 
 def _load_kube_config(in_cluster):
     from kubernetes import config, client
@@ -26,6 +26,6 @@ def _load_kube_config(in_cluster):
         return client.CoreV1Api()
 
 
-def get_kube_client(in_cluster=True):
+def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
     # TODO: This should also allow people to point to a cluster.
     return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index cd2cb9f..ac4dacf 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -21,13 +21,18 @@ import six
 
 from airflow.contrib.kubernetes.pod import Pod, Resources
 from airflow.contrib.kubernetes.secret import Secret
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
-class WorkerConfiguration:
+class WorkerConfiguration(LoggingMixin):
     """Contains Kubernetes Airflow Worker configuration logic"""
 
     def __init__(self, kube_config):
         self.kube_config = kube_config
+        self.worker_airflow_home = self.kube_config.airflow_home
+        self.worker_airflow_dags = self.kube_config.dags_folder
+        self.worker_airflow_logs = self.kube_config.base_log_folder
+        super(WorkerConfiguration, self).__init__()
 
     def _get_init_containers(self, volume_mounts):
         """When using git to retrieve the DAGs, use the GitSync Init Container"""
@@ -79,7 +84,7 @@ class WorkerConfiguration:
             'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
         }
         if self.kube_config.airflow_configmap:
-            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
         return env
 
     def _get_secrets(self):
@@ -129,19 +134,19 @@ class WorkerConfiguration:
         volume_mounts = [{
             'name': dags_volume_name,
             'mountPath': os.path.join(
-                self.kube_config.dags_folder,
+                self.worker_airflow_dags,
                 self.kube_config.git_subpath
             ),
             'readOnly': True
         }, {
             'name': logs_volume_name,
-            'mountPath': self.kube_config.base_log_folder
+            'mountPath': self.worker_airflow_logs
         }]
 
         # Mount the airflow.cfg file via a configmap the user has specified
         if self.kube_config.airflow_configmap:
             config_volume_name = 'airflow-config'
-            config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+            config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
             volumes.append({
                 'name': config_volume_name,
                 'configMap': {
@@ -172,6 +177,10 @@ class WorkerConfiguration:
         annotations = {
             'iam.cloud.google.com/service-account': gcp_sa_key
         } if gcp_sa_key else {}
+        airflow_command = airflow_command.replace("-sd", "-i -sd")
+        airflow_path = airflow_command.split('-sd')[-1]
+        airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
+        airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path
 
         return Pod(
             namespace=namespace,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ecbfef8..e5a07b7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py
index 5bc0529..bddf0c1 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,8 @@ from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils import timezone
 from airflow.www_rbac.app import csrf
+from airflow import models
+from airflow.utils.db import create_session
 
 from flask import g, Blueprint, jsonify, request, url_for
 
@@ -112,6 +114,27 @@ def task_info(dag_id, task_id):
     return jsonify(fields)
 
 
+@api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', methods=['GET'])
+@requires_authentication
+def dag_paused(dag_id, paused):
+    """(Un)pauses a dag"""
+
+    DagModel = models.DagModel
+    with create_session() as session:
+        orm_dag = (
+            session.query(DagModel)
+                   .filter(DagModel.dag_id == dag_id).first()
+        )
+        if paused == 'true':
+            orm_dag.is_paused = True
+        else:
+            orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+    return jsonify({'response': 'ok'})
+
+
 @api_experimental.route(
     '/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
     methods=['GET'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index 967b698..6d2c62d 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \
         unzip \
     && apt-get clean
 
+
+RUN pip install --upgrade pip
+
 # Since we install vanilla Airflow, we also want to have support for Postgres and Kubernetes
 RUN pip install -U setuptools && \
     pip install kubernetes && \
@@ -43,6 +46,8 @@ RUN pip install -U setuptools && \
 COPY airflow.tar.gz /tmp/airflow.tar.gz
 RUN pip install /tmp/airflow.tar.gz
 
+COPY airflow-init.sh /tmp/airflow-init.sh
+
 COPY bootstrap.sh /bootstrap.sh
 RUN chmod +x /bootstrap.sh
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
new file mode 100755
index 0000000..dc33625
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/airflow-init.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.
+
+cd /usr/local/lib/python2.7/dist-packages/airflow && \
+cp -R example_dags/* /root/airflow/dags/ && \
+airflow initdb && \
+alembic upgrade heads && \
+airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/build.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh
index 6f14c4d..b93c6b1 100755
--- a/scripts/ci/kubernetes/docker/build.sh
+++ b/scripts/ci/kubernetes/docker/build.sh
@@ -27,7 +27,12 @@ if [ $? -eq 0 ]; then
   eval $ENVCONFIG
 fi
 
-cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \
-cd $DIRNAME && \
-docker build --pull $DIRNAME --tag=${IMAGE}:${TAG} && \
+echo "Airflow directory $AIRFLOW_ROOT"
+echo "Airflow Docker directory $DIRNAME"
+
+cd $AIRFLOW_ROOT
+python setup.py sdist -q
+echo "Copy distro $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz"
+cp $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz
+cd $DIRNAME && docker build --pull $DIRNAME --tag=${IMAGE}:${TAG}
 rm $DIRNAME/airflow.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/airflow.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml
index 77566ae..09bbcd8 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml
+++ b/scripts/ci/kubernetes/kube/airflow.yaml
@@ -61,7 +61,7 @@ spec:
           - "bash"
         args:
           - "-cx"
-          - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads"
+          - "./tmp/airflow-init.sh"
       containers:
       - name: webserver
         image: airflow
@@ -88,20 +88,20 @@ spec:
           mountPath: /root/airflow/dags
         - name: airflow-logs
           mountPath: /root/airflow/logs
-        readinessProbe:
-          initialDelaySeconds: 5
-          timeoutSeconds: 5
-          periodSeconds: 5
-          httpGet:
-            path: /admin
-            port: 8080
-        livenessProbe:
-          initialDelaySeconds: 5
-          timeoutSeconds: 5
-          failureThreshold: 5
-          httpGet:
-            path: /admin
-            port: 8080
+#        readinessProbe:
+#          initialDelaySeconds: 5
+#          timeoutSeconds: 5
+#          periodSeconds: 5
+#          httpGet:
+#            path: /login
+#            port: 8080
+#        livenessProbe:
+#          initialDelaySeconds: 5
+#          timeoutSeconds: 5
+#          failureThreshold: 5
+#          httpGet:
+#            path: /login
+#            port: 8080
       - name: scheduler
         image: airflow
         imagePullPolicy: IfNotPresent
@@ -146,76 +146,4 @@ spec:
       nodePort: 30809
   selector:
     name: airflow
----
-apiVersion: v1
-kind: Secret
-metadata:
-  name: airflow-secrets
-type: Opaque
-data:
-  # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
-  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: airflow-configmap
-data:
-  airflow.cfg: |
-    [core]
-    airflow_home = /root/airflow
-    dags_folder = /root/airflow/dags
-    base_log_folder = /root/airflow/logs
-    logging_level = INFO
-    executor = KubernetesExecutor
-    parallelism = 32
-    plugins_folder = /root/airflow/plugins
-    sql_alchemy_conn = $SQL_ALCHEMY_CONN
-
-    [scheduler]
-    dag_dir_list_interval = 300
-    child_process_log_directory = /root/airflow/logs/scheduler
-    # Task instances listen for external kill signal (when you clear tasks
-    # from the CLI or the UI), this defines the frequency at which they should
-    # listen (in seconds).
-    job_heartbeat_sec = 5
-    max_threads = 16
-
-    # The scheduler constantly tries to trigger new tasks (look at the
-    # scheduler section in the docs for more information). This defines
-    # how often the scheduler should run (in seconds).
-    scheduler_heartbeat_sec = 5
-
-    # after how much time should the scheduler terminate in seconds
-    # -1 indicates to run continuously (see also num_runs)
-    run_duration = -1
-
-    # after how much time a new DAGs should be picked up from the filesystem
-    min_file_process_interval = 0
-
-    statsd_on = False
-    statsd_host = localhost
-    statsd_port = 8125
-    statsd_prefix = airflow
-
-    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
-    min_file_parsing_loop_time = 1
-
-    print_stats_interval = 30
-    scheduler_zombie_task_threshold = 300
-    max_tis_per_query = 0
-    authenticate = False
-
-    [kubernetes]
-    airflow_configmap = airflow-configmap
-    worker_container_repository = airflow
-    worker_container_tag = latest
-    delete_worker_pods = True
-    git_repo = https://github.com/grantnicholas/testdags.git
-    git_branch = master
-    dags_volume_claim = airflow-dags
-    logs_volume_claim = airflow-logs
 
-    [kubernetes_secrets]
-    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/configmaps.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
new file mode 100644
index 0000000..ddba098
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -0,0 +1,359 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: airflow-configmap
+data:
+  airflow.cfg: |
+    [core]
+    airflow_home = /root/airflow
+    dags_folder = /root/airflow/dags
+    base_log_folder = /root/airflow/logs
+    logging_level = INFO
+    executor = KubernetesExecutor
+    parallelism = 32
+    load_examples = True
+    plugins_folder = /root/airflow/plugins
+    sql_alchemy_conn = $SQL_ALCHEMY_CONN
+
+    [scheduler]
+    dag_dir_list_interval = 300
+    child_process_log_directory = /root/airflow/logs/scheduler
+    # Task instances listen for external kill signal (when you clear tasks
+    # from the CLI or the UI), this defines the frequency at which they should
+    # listen (in seconds).
+    job_heartbeat_sec = 5
+    max_threads = 2
+
+    # The scheduler constantly tries to trigger new tasks (look at the
+    # scheduler section in the docs for more information). This defines
+    # how often the scheduler should run (in seconds).
+    scheduler_heartbeat_sec = 5
+
+    # after how much time should the scheduler terminate in seconds
+    # -1 indicates to run continuously (see also num_runs)
+    run_duration = -1
+
+    # after how much time a new DAGs should be picked up from the filesystem
+    min_file_process_interval = 0
+
+    statsd_on = False
+    statsd_host = localhost
+    statsd_port = 8125
+    statsd_prefix = airflow
+
+    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
+    min_file_parsing_loop_time = 1
+
+    print_stats_interval = 30
+    scheduler_zombie_task_threshold = 300
+    max_tis_per_query = 0
+    authenticate = False
+
+    # Turn off scheduler catchup by setting this to False.
+    # Default behavior is unchanged and
+    # Command Line Backfills still work, but the scheduler
+    # will not do scheduler catchup if this is False,
+    # however it can be set on a per DAG basis in the
+    # DAG definition (catchup)
+    catchup_by_default = True
+
+    [webserver]
+    # The base url of your website as airflow cannot guess what domain or
+    # cname you are using. This is used in automated emails that
+    # airflow sends to point links to the right web server
+    base_url = http://localhost:8080
+
+    # The ip specified when starting the web server
+    web_server_host = 0.0.0.0
+
+    # The port on which to run the web server
+    web_server_port = 8080
+
+    # Paths to the SSL certificate and key for the web server. When both are
+    # provided SSL will be enabled. This does not change the web server port.
+    web_server_ssl_cert =
+    web_server_ssl_key =
+
+    # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
+    web_server_master_timeout = 120
+
+    # Number of seconds the gunicorn webserver waits before timing out on a worker
+    web_server_worker_timeout = 120
+
+    # Number of workers to refresh at a time. When set to 0, worker refresh is
+    # disabled. When nonzero, airflow periodically refreshes webserver workers by
+    # bringing up new ones and killing old ones.
+    worker_refresh_batch_size = 1
+
+    # Number of seconds to wait before refreshing a batch of workers.
+    worker_refresh_interval = 30
+
+    # Secret key used to run your flask app
+    secret_key = temporary_key
+
+    # Number of workers to run the Gunicorn web server
+    workers = 4
+
+    # The worker class gunicorn should use. Choices include
+    # sync (default), eventlet, gevent
+    worker_class = sync
+
+    # Log files for the gunicorn webserver. '-' means log to stderr.
+    access_logfile = -
+    error_logfile = -
+
+    # Expose the configuration file in the web server
+    expose_config = False
+
+    # Set to true to turn on authentication:
+    # https://airflow.incubator.apache.org/security.html#web-authentication
+    authenticate = False
+
+    # Filter the list of dags by owner name (requires authentication to be enabled)
+    filter_by_owner = False
+
+    # Filtering mode. Choices include user (default) and ldapgroup.
+    # Ldap group filtering requires using the ldap backend
+    #
+    # Note that the ldap server needs the "memberOf" overlay to be set up
+    # in order to user the ldapgroup mode.
+    owner_mode = user
+
+    # Default DAG view.  Valid values are:
+    # tree, graph, duration, gantt, landing_times
+    dag_default_view = tree
+
+    # Default DAG orientation. Valid values are:
+    # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
+    dag_orientation = LR
+
+    # Puts the webserver in demonstration mode; blurs the names of Operators for
+    # privacy.
+    demo_mode = False
+
+    # The amount of time (in secs) webserver will wait for initial handshake
+    # while fetching logs from other worker machine
+    log_fetch_timeout_sec = 5
+
+    # By default, the webserver shows paused DAGs. Flip this to hide paused
+    # DAGs by default
+    hide_paused_dags_by_default = False
+
+    # Consistent page size across all listing views in the UI
+    page_size = 100
+
+    # Use FAB-based webserver with RBAC feature
+    rbac = True
+
+    [smtp]
+    # If you want airflow to send emails on retries, failure, and you want to use
+    # the airflow.utils.email.send_email_smtp function, you have to configure an
+    # smtp server here
+    smtp_host = localhost
+    smtp_starttls = True
+    smtp_ssl = False
+    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
+    # smtp_user = airflow
+    # smtp_password = airflow
+    smtp_port = 25
+    smtp_mail_from = airflow@example.com
+
+    [kubernetes]
+    airflow_configmap = airflow-configmap
+    worker_container_repository = airflow
+    worker_container_tag = latest
+    delete_worker_pods = True
+    git_repo = https://github.com/apache/incubator-airflow.git
+    git_branch = master
+    git_subpath = airflow/example_dags/
+    git_user =
+    git_password =
+    dags_volume_claim = airflow-dags
+    logs_volume_claim = airflow-logs
+    in_cluster = True
+    namespace = default
+    gcp_service_account_keys =
+
+    # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
+    git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
+    git_sync_container_tag = v2.0.5
+    git_sync_init_container_name = git-sync-clone
+
+    [kubernetes_secrets]
+    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
+
+    [hive]
+    # Default mapreduce queue for HiveOperator tasks
+    default_hive_mapred_queue =
+
+    [celery]
+    # This section only applies if you are using the CeleryExecutor in
+    # [core] section above
+
+    # The app name that will be used by celery
+    celery_app_name = airflow.executors.celery_executor
+
+    # The concurrency that will be used when starting workers with the
+    # "airflow worker" command. This defines the number of task instances that
+    # a worker will take, so size up your workers based on the resources on
+    # your worker box and the nature of your tasks
+    worker_concurrency = 16
+
+    # When you start an airflow worker, airflow starts a tiny web server
+    # subprocess to serve the workers local log files to the airflow main
+    # web server, who then builds pages and sends them to users. This defines
+    # the port on which the logs are served. It needs to be unused, and open
+    # visible from the main web server to connect into the workers.
+    worker_log_server_port = 8793
+
+    # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
+    # a sqlalchemy database. Refer to the Celery documentation for more
+    # information.
+    # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
+    broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
+
+    # The Celery result_backend. When a job finishes, it needs to update the
+    # metadata of the job. Therefore it will post a message on a message bus,
+    # or insert it into a database (depending of the backend)
+    # This status is used by the scheduler to update the state of the task
+    # The use of a database is highly recommended
+    # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
+    result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+
+    # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
+    # it `airflow flower`. This defines the IP that Celery Flower runs on
+    flower_host = 0.0.0.0
+
+    # The root URL for Flower
+    # Ex: flower_url_prefix = /flower
+    flower_url_prefix =
+
+    # This defines the port that Celery Flower runs on
+    flower_port = 5555
+
+    # Default queue that tasks get assigned to and that worker listen on.
+    default_queue = default
+
+    # Import path for celery configuration options
+    celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
+
+    [celery_broker_transport_options]
+    # The visibility timeout defines the number of seconds to wait for the worker
+    # to acknowledge the task before the message is redelivered to another worker.
+    # Make sure to increase the visibility timeout to match the time of the longest
+    # ETA you're planning to use. Especially important in case of using Redis or SQS
+    visibility_timeout = 21600
+
+    # In case of using SSL
+    ssl_active = False
+    ssl_key =
+    ssl_cert =
+    ssl_cacert =
+
+    [dask]
+    # This section only applies if you are using the DaskExecutor in
+    # [core] section above
+
+    # The IP address and port of the Dask cluster's scheduler.
+    cluster_address = 127.0.0.1:8786
+    # TLS/ SSL settings to access a secured Dask scheduler.
+    tls_ca =
+    tls_cert =
+    tls_key =
+
+    [ldap]
+    # set this to ldaps://<your.ldap.server>:<port>
+    uri =
+    user_filter = objectClass=*
+    user_name_attr = uid
+    group_member_attr = memberOf
+    superuser_filter =
+    data_profiler_filter =
+    bind_user = cn=Manager,dc=example,dc=com
+    bind_password = insecure
+    basedn = dc=example,dc=com
+    cacert = /etc/ca/ldap_ca.crt
+    search_scope = LEVEL
+
+    [mesos]
+    # Mesos master address which MesosExecutor will connect to.
+    master = localhost:5050
+
+    # The framework name which Airflow scheduler will register itself as on mesos
+    framework_name = Airflow
+
+    # Number of cpu cores required for running one task instance using
+    # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+    # command on a mesos slave
+    task_cpu = 1
+
+    # Memory in MB required for running one task instance using
+    # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+    # command on a mesos slave
+    task_memory = 256
+
+    # Enable framework checkpointing for mesos
+    # See http://mesos.apache.org/documentation/latest/slave-recovery/
+    checkpoint = False
+
+    # Failover timeout in milliseconds.
+    # When checkpointing is enabled and this option is set, Mesos waits
+    # until the configured timeout for
+    # the MesosExecutor framework to re-register after a failover. Mesos
+    # shuts down running tasks if the
+    # MesosExecutor framework fails to re-register within this timeframe.
+    # failover_timeout = 604800
+
+    # Enable framework authentication for mesos
+    # See http://mesos.apache.org/documentation/latest/configuration/
+    authenticate = False
+
+    # Mesos credentials, if authentication is enabled
+    # default_principal = admin
+    # default_secret = admin
+
+    # Optional Docker Image to run on slave before running the command
+    # This image should be accessible from mesos slave i.e mesos slave
+    # should be able to pull this docker image before executing the command.
+    # docker_image_slave = puckel/docker-airflow
+
+    [kerberos]
+    ccache = /tmp/airflow_krb5_ccache
+    # gets augmented with fqdn
+    principal = airflow
+    reinit_frequency = 3600
+    kinit_path = kinit
+    keytab = airflow.keytab
+
+    [cli]
+    api_client = airflow.api.client.json_client
+    endpoint_url = http://localhost:8080
+
+    [api]
+    auth_backend = airflow.api.auth.backend.default
+
+    [github_enterprise]
+    api_rev = v3
+
+    [admin]
+    # UI to hide sensitive variable fields when set to True
+    hide_sensitive_variable_fields = True
+
+    [elasticsearch]
+    elasticsearch_host =

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh
index a5adcf8..e585d87 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -21,8 +21,14 @@ IMAGE=${1:-airflow/ci}
 TAG=${2:-latest}
 DIRNAME=$(cd "$(dirname "$0")"; pwd)
 
+kubectl delete -f $DIRNAME/postgres.yaml
+kubectl delete -f $DIRNAME/airflow.yaml
+kubectl delete -f $DIRNAME/secrets.yaml
+
 kubectl apply -f $DIRNAME/postgres.yaml
 kubectl apply -f $DIRNAME/volumes.yaml
+kubectl apply -f $DIRNAME/secrets.yaml
+kubectl apply -f $DIRNAME/configmaps.yaml
 kubectl apply -f $DIRNAME/airflow.yaml
 
 # wait for up to 10 minutes for everything to be deployed

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml
index 67a0635..1130921 100644
--- a/scripts/ci/kubernetes/kube/postgres.yaml
+++ b/scripts/ci/kubernetes/kube/postgres.yaml
@@ -30,6 +30,7 @@ spec:
       containers:
         - name: postgres
           image: postgres
+          imagePullPolicy: IfNotPresent
           ports:
             - containerPort: 5432
               protocol: TCP

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/secrets.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/secrets.yaml b/scripts/ci/kubernetes/kube/secrets.yaml
new file mode 100644
index 0000000..4d533b3
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/secrets.yaml
@@ -0,0 +1,25 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+apiVersion: v1
+kind: Secret
+metadata:
+  name: airflow-secrets
+type: Opaque
+data:
+  # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
+  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/volumes.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml
index 073e98c..58ad368 100644
--- a/scripts/ci/kubernetes/kube/volumes.yaml
+++ b/scripts/ci/kubernetes/kube/volumes.yaml
@@ -62,3 +62,4 @@ spec:
   resources:
     requests:
       storage: 2Gi
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 8766e94..52571cc 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -26,7 +26,7 @@ then
   tox -e $TOX_ENV
 else
   KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+  tox -e $TOX_ENV -- tests.contrib.minikube \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/setup.cfg
----------------------------------------------------------------------
diff --git a/setup.cfg b/setup.cfg
index 7b3479c..622cc13 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -6,16 +5,15 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 [metadata]
 name = Airflow
 summary = Airflow is a system to programmatically author, schedule and monitor data pipelines.
@@ -29,8 +27,11 @@ packages = airflow
 
 [build_sphinx]
 source-dir = docs/
-build-dir  = docs/_build
-all_files  = 1
+build-dir = docs/_build
+all_files = 1
 
 [upload_sphinx]
 upload-dir = docs/_build/html
+
+[easy_install]
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/cli/test_cli.py
----------------------------------------------------------------------
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 9c79567..34c82bc 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,15 +22,84 @@ import unittest
 
 from mock import patch, Mock, MagicMock
 from time import sleep
-
 import psutil
-
+from argparse import Namespace
 from airflow import settings
-from airflow.bin.cli import get_num_ready_workers_running
+from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.models import TaskInstance
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.settings import Session
+from airflow import models
+
+import os
+
+dag_folder_path = '/'.join(os.path.realpath(__file__).split('/')[:-1])
+
+TEST_DAG_FOLDER = os.path.join(
+    os.path.dirname(dag_folder_path), 'dags')
+TEST_DAG_ID = 'unit_tests'
+
+
+def reset(dag_id):
+    session = Session()
+    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+    tis.delete()
+    session.commit()
+    session.close()
+
+
+def create_mock_args(
+    task_id,
+    dag_id,
+    subdir,
+    execution_date,
+    task_params=None,
+    dry_run=False,
+    queue=None,
+    pool=None,
+    priority_weight_total=None,
+    retries=0,
+    local=True,
+    mark_success=False,
+    ignore_all_dependencies=False,
+    ignore_depends_on_past=False,
+    ignore_dependencies=False,
+    force=False,
+    run_as_user=None,
+    executor_config={},
+    cfg_path=None,
+    pickle=None,
+    raw=None,
+    interactive=None,
+):
+    args = MagicMock(spec=Namespace)
+    args.task_id = task_id
+    args.dag_id = dag_id
+    args.subdir = subdir
+    args.task_params = task_params
+    args.execution_date = execution_date
+    args.dry_run = dry_run
+    args.queue = queue
+    args.pool = pool
+    args.priority_weight_total = priority_weight_total
+    args.retries = retries
+    args.local = local
+    args.run_as_user = run_as_user
+    args.executor_config = executor_config
+    args.cfg_path = cfg_path
+    args.pickle = pickle
+    args.raw = raw
+    args.mark_success = mark_success
+    args.ignore_all_dependencies = ignore_all_dependencies
+    args.ignore_depends_on_past = ignore_depends_on_past
+    args.ignore_dependencies = ignore_dependencies
+    args.force = force
+    args.interactive = interactive
+    return args
 
 
 class TestCLI(unittest.TestCase):
-
     def setUp(self):
         self.gunicorn_master_proc = Mock(pid=None)
         self.children = MagicMock()
@@ -74,3 +143,23 @@ class TestCLI(unittest.TestCase):
             "webserver terminated with return code {} in debug mode".format(return_code))
         p.terminate()
         p.wait()
+
+    def test_local_run(self):
+        args = create_mock_args(
+            task_id='print_the_context',
+            dag_id='example_python_operator',
+            subdir='/root/dags/example_python_operator.py',
+            interactive=True,
+            execution_date=timezone.parse('2018-04-27T08:39:51.298439+00:00')
+        )
+
+        reset(args.dag_id)
+
+        with patch('argparse.Namespace', args) as mock_args:
+            run(mock_args)
+            dag = get_dag(mock_args)
+            task = dag.get_task(task_id=args.task_id)
+            ti = TaskInstance(task, args.execution_date)
+            ti.refresh_from_db()
+            state = ti.current_state()
+            self.assertEqual(state, State.SUCCESS)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py
deleted file mode 100644
index 759b563..0000000
--- a/tests/contrib/kubernetes/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/__init__.py b/tests/contrib/minikube/__init__.py
new file mode 100644
index 0000000..114d189
--- /dev/null
+++ b/tests/contrib/minikube/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py
new file mode 100644
index 0000000..9827bc8
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from subprocess import check_call, check_output
+
+import requests
+import time
+import six
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except Exception as e:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster;"
+        "Skipping tests {}".format(e)
+    )
+
+
+class KubernetesExecutorTest(unittest.TestCase):
+
+    def test_integration_run_dag(self):
+        host_ip = check_output(['minikube', 'ip'])
+        if six.PY3:
+            host_ip = host_ip.decode('UTF-8')
+        host = '{}:30809'.format(host_ip.strip())
+
+        #  Enable the dag
+        result = requests.get(
+            'http://{}/api/experimental/'
+            'dags/example_python_operator/paused/false'.format(host)
+        )
+        self.assertEqual(result.status_code, 200, "Could not enable DAG")
+
+        # Trigger a new dagrun
+        result = requests.post(
+            'http://{}/api/experimental/'
+            'dags/example_python_operator/dag_runs'.format(host),
+            json={}
+        )
+        self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run")
+
+        time.sleep(1)
+
+        result = requests.get(
+            'http://{}/api/experimental/latest_runs'.format(host)
+        )
+        self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run")
+        result_json = result.json()
+
+        self.assertGreater(len(result_json['items']), 0)
+
+        execution_date = result_json['items'][0]['execution_date']
+        print("Found the job with execution date {}".format(execution_date))
+
+        tries = 0
+        state = ''
+        # Wait 100 seconds for the operator to complete
+        while tries < 20:
+            time.sleep(5)
+
+            # Trigger a new dagrun
+            result = requests.get(
+                'http://{}/api/experimental/dags/example_python_operator/'
+                'dag_runs/{}/tasks/print_the_context'.format(host, execution_date)
+            )
+            self.assertEqual(result.status_code, 200, "Could not get the status")
+            result_json = result.json()
+            state = result_json['state']
+            print("Attempt {}: Current state of operator is {}".format(tries, state))
+
+            if state == 'success':
+                break
+            tries += 1
+
+        self.assertEqual(state, 'success')
+
+        # Maybe check if we can retrieve the logs, but then we need to extend the API
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..081fc04
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+import mock
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except Exception as e:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster;"
+        "Skipping tests {}".format(e)
+    )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+    def test_working_pod(self):
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo", "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task"
+        )
+        k.execute(None)
+
+    def test_logging(self):
+        with mock.patch.object(PodLauncher, 'log') as mock_logger:
+            k = KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo", "10"],
+                labels={"foo": "bar"},
+                name="test",
+                task_id="task",
+                get_logs=True
+            )
+            k.execute(None)
+            mock_logger.info.assert_any_call(b"+ echo\n")
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image=bad_image_name,
+            cmds=["bash", "-cx"],
+            arguments=["echo", "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            startup_timeout_seconds=5
+        )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=[bad_internal_command, "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task"
+        )
+        with self.assertRaises(AirflowException):
+            k.execute(None)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
deleted file mode 100644
index 4067cc7..0000000
--- a/tests/contrib/minikube_tests/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/__init__.py b/tests/contrib/minikube_tests/integration/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/minikube_tests/integration/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py b/tests/contrib/minikube_tests/integration/airflow_controller.py
deleted file mode 100644
index 5604652..0000000
--- a/tests/contrib/minikube_tests/integration/airflow_controller.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import subprocess
-import time
-
-
-class RunCommandError(Exception):
-    pass
-
-
-class TimeoutError(Exception):
-    pass
-
-
-class DagRunState:
-    SUCCESS = "success"
-    FAILED = "failed"
-    RUNNING = "running"
-
-
-def run_command(command):
-    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE)
-    stdout, stderr = process.communicate()
-    if process.returncode != 0:
-        raise RunCommandError(
-            "Error while running command: {}; Stdout: {}; Stderr: {}".format(
-                command, stdout, stderr
-            ))
-    return stdout, stderr
-
-
-def run_command_in_pod(pod_name, container_name, command):
-    return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format(
-        pod_name=pod_name, container_name=container_name, command=command
-    ))
-
-
-def get_scheduler_logs(airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-
-    return run_command("kubectl logs {pod_name} scheduler"
-                       .format(pod_name=airflow_pod))
-
-
-def _unpause_dag(dag_id, airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-    return run_command_in_pod(airflow_pod, "scheduler",
-                              "airflow unpause {dag_id}".format(dag_id=dag_id))
-
-
-def run_dag(dag_id, run_id, airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-    _unpause_dag(dag_id, airflow_pod)
-    return run_command_in_pod(airflow_pod, "scheduler",
-                              "airflow trigger_dag {dag_id} -r {run_id}".format(
-                                  dag_id=dag_id, run_id=run_id
-                              ))
-
-
-def _get_pod_by_grep(grep_phrase):
-    stdout, stderr = run_command(
-        "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format(
-            grep_phrase=grep_phrase
-        ))
-    pod_name = stdout.strip()
-    return pod_name
-
-
-def _get_airflow_pod():
-    return _get_pod_by_grep("^airflow")
-
-
-def _get_postgres_pod():
-    return _get_pod_by_grep("^postgres")
-
-
-def _parse_state(stdout):
-    end_line = "(1 row)"
-    prev_line = None
-    for line in stdout.split("\n"):
-        if end_line in line:
-            return prev_line.strip()
-        prev_line = line
-
-    raise Exception("Unknown psql output: {}".format(stdout))
-
-
-def get_dag_run_table(postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select * from dag_run" """
-    )
-    return stdout
-
-
-def get_task_instance_table(postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select * from task_instance" """
-    )
-    return stdout
-
-
-def get_dag_run_state(dag_id, run_id, postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and
-         run_id='{run_id}'" """.format(
-            dag_id=dag_id, run_id=run_id
-        )
-    )
-    return _parse_state(stdout)
-
-
-def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    for _ in range(0, timeout / poll_interval):
-        dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
-        if dag_state != DagRunState.RUNNING:
-            capture_logs_for_failure(dag_state)
-            return dag_state
-        time.sleep(poll_interval)
-
-    raise TimeoutError(
-        "Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id,
-                                                                               run_id))
-
-
-def _kill_pod(pod_name):
-    return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name))
-
-
-def kill_scheduler():
-    airflow_pod = _get_pod_by_grep("^airflow")
-    return _kill_pod(airflow_pod)
-
-
-def capture_logs_for_failure(state):
-    if state != DagRunState.SUCCESS:
-        stdout, stderr = get_scheduler_logs()
-        print("stdout:")
-        for line in stdout.split('\n'):
-            print(line)
-        print("stderr:")
-        for line in stderr.split('\n'):
-            print(line)
-        print("dag_run:")
-        print(get_dag_run_table())
-        print("task_instance")
-        print(get_task_instance_table())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
deleted file mode 100644
index 602a717..0000000
--- a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import unittest
-from uuid import uuid4
-
-from tests.contrib.minikube_tests.integration.airflow_controller\
-    import DagRunState, RunCommandError, \
-    dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag
-
-try:
-    run_command("kubectl get pods")
-except RunCommandError:
-    SKIP_KUBE = True
-else:
-    SKIP_KUBE = False
-
-
-class KubernetesExecutorTest(unittest.TestCase):
-    @unittest.skipIf(SKIP_KUBE,
-                     'Kubernetes integration tests are unsupported by this configuration')
-    def test_kubernetes_executor_dag_runs_successfully(self):
-        dag_id, run_id = "example_python_operator", uuid4().hex
-        run_dag(dag_id, run_id)
-        state = dag_final_state(dag_id, run_id, timeout=120)
-        self.assertEquals(state, DagRunState.SUCCESS)
-
-    @unittest.skipIf(SKIP_KUBE,
-                     'Kubernetes integration tests are unsupported by this configuration')
-    def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
-        dag_id, run_id = "example_python_operator", uuid4().hex
-        run_dag(dag_id, run_id)
-
-        self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
-
-        time.sleep(10)
-
-        kill_scheduler()
-
-        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180),
-                          DagRunState.SUCCESS)
-
-    @unittest.skipIf(SKIP_KUBE,
-                     'Kubernetes integration tests are unsupported by this configuration')
-    def test_kubernetes_executor_config_works(self):
-        dag_id, run_id = "example_kubernetes_executor", uuid4().hex
-        run_dag(dag_id, run_id)
-
-        self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
-        self.assertEquals(dag_final_state(dag_id, run_id, timeout=300),
-                          DagRunState.SUCCESS)
-
-
-if __name__ == "__main__":
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
deleted file mode 100644
index 321f01f..0000000
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-from subprocess import check_call
-import mock
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-
-try:
-    check_call(["kubectl", "get", "pods"])
-except Exception as e:
-    raise unittest.SkipTest(
-        "Kubernetes integration tests require a minikube cluster;"
-        "Skipping tests {}".format(e)
-    )
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-    def test_working_pod(self):
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        k.execute(None)
-
-    def test_logging(self):
-        with mock.patch.object(PodLauncher, 'log') as mock_logger:
-            k = KubernetesPodOperator(namespace='default',
-                                      image="ubuntu:16.04",
-                                      cmds=["bash", "-cx"],
-                                      arguments=["echo", "10"],
-                                      labels={"foo": "bar"},
-                                      name="test",
-                                      task_id="task",
-                                      get_logs=True
-                                      )
-            k.execute(None)
-            mock_logger.info.assert_any_call("+ echo\n")
-
-    def test_faulty_image(self):
-        bad_image_name = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image=bad_image_name,
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task",
-                                  startup_timeout_seconds=5
-                                  )
-        with self.assertRaises(AirflowException) as cm:
-            k.execute(None),
-
-        print("exception: {}".format(cm))
-
-    def test_pod_failure(self):
-        """
-            Tests that the task fails when a pod reports a failure
-        """
-
-        bad_internal_command = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=[bad_internal_command, "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        with self.assertRaises(AirflowException):
-            k.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/www_rbac/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py
index 7bcbb8e..a19492e 100644
--- a/tests/www_rbac/api/experimental/test_endpoints.py
+++ b/tests/www_rbac/api/experimental/test_endpoints.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -77,6 +77,23 @@ class TestApiExperimental(unittest.TestCase):
         self.assertIn('error', response.data.decode('utf-8'))
         self.assertEqual(404, response.status_code)
 
+    def test_task_paused(self):
+        url_template = '/api/experimental/dags/{}/paused/{}'
+
+        response = self.app.get(
+            url_template.format('example_bash_operator', 'true')
+        )
+        self.assertIn('ok', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
+        url_template = '/api/experimental/dags/{}/paused/{}'
+
+        response = self.app.get(
+            url_template.format('example_bash_operator', 'false')
+        )
+        self.assertIn('ok', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
     def test_trigger_dag(self):
         url_template = '/api/experimental/dags/{}/dag_runs'
         response = self.app.post(