You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/04 06:58:47 UTC
incubator-airflow git commit: [AIRFLOW-1899] Fix Kubernetes tests
Repository: incubator-airflow
Updated Branches:
refs/heads/master d1f7af393 -> 16bae5634
[AIRFLOW-1899] Fix Kubernetes tests
[AIRFLOW-1899] Add full deployment
- Made home directory configurable
- Documentation fix
- Add licenses
[AIRFLOW-1899] Tests for the Kubernetes Executor
Add an integration test for the Kubernetes
executor. Done by
spinning up different versions of kubernetes and
run a DAG
by invoking the REST API
Closes #3301 from Fokko/fix-kubernetes-executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16bae563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16bae563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16bae563
Branch: refs/heads/master
Commit: 16bae5634df24132b37eb752fe816f51bf7e83ca
Parents: d1f7af3
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri May 4 08:58:12 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri May 4 08:58:12 2018 +0200
----------------------------------------------------------------------
.gitignore | 1 -
.travis.yml | 3 -
airflow/config_templates/default_airflow.cfg | 8 +
airflow/configuration.py | 9 +-
.../contrib/executors/kubernetes_executor.py | 80 ++---
airflow/contrib/kubernetes/kube_client.py | 4 +-
.../contrib/kubernetes/worker_configuration.py | 19 +-
airflow/jobs.py | 4 +-
airflow/www_rbac/api/experimental/endpoints.py | 27 +-
scripts/ci/kubernetes/docker/Dockerfile | 5 +
scripts/ci/kubernetes/docker/airflow-init.sh | 24 ++
scripts/ci/kubernetes/docker/build.sh | 11 +-
scripts/ci/kubernetes/kube/airflow.yaml | 102 +-----
scripts/ci/kubernetes/kube/configmaps.yaml | 359 +++++++++++++++++++
scripts/ci/kubernetes/kube/deploy.sh | 6 +
scripts/ci/kubernetes/kube/postgres.yaml | 1 +
scripts/ci/kubernetes/kube/secrets.yaml | 25 ++
scripts/ci/kubernetes/kube/volumes.yaml | 1 +
scripts/ci/travis_script.sh | 2 +-
setup.cfg | 13 +-
tests/cli/test_cli.py | 101 +++++-
tests/contrib/kubernetes/__init__.py | 14 -
tests/contrib/minikube/__init__.py | 18 +
.../minikube/test_kubernetes_executor.py | 97 +++++
.../minikube/test_kubernetes_pod_operator.py | 98 +++++
tests/contrib/minikube_tests/__init__.py | 18 -
.../minikube_tests/integration/__init__.py | 13 -
.../integration/airflow_controller.py | 166 ---------
.../test_kubernetes_executor_integration.py | 67 ----
.../test_kubernetes_pod_operator.py | 93 -----
.../www_rbac/api/experimental/test_endpoints.py | 21 +-
31 files changed, 866 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 04c408f..a42962f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -137,4 +137,3 @@ rat-results.txt
*.generated
*.tar.gz
scripts/ci/kubernetes/kube/.generated/airflow.yaml
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6d29a7a..77033df 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -80,9 +80,6 @@ matrix:
env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- python: "2.7"
env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
- allow_failures:
- - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
- - env: TOX_ENV=py35-backend_postgres KUBERNETES_VERSION=v1.9.0
cache:
directories:
- $HOME/.wheelhouse/
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index fa5eea0..6da4287 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -502,6 +504,7 @@ hide_sensitive_variable_fields = True
[elasticsearch]
elasticsearch_host =
+
[kubernetes]
# The repository and tag of the Kubernetes Image for the Worker to Run
worker_container_repository =
@@ -550,6 +553,11 @@ image_pull_secrets =
# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
gcp_service_account_keys =
+# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
+# It’s intended for clients that expect to be running inside a pod running on kubernetes.
+# It will raise an exception if called from a process not running in a kubernetes environment.
+in_cluster = True
+
[kubernetes_secrets]
# The scheduler mounts the following secrets into your workers as they are launched by the
# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 130356c..20ef067 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -442,7 +442,10 @@ if not os.path.isfile(AIRFLOW_CONFIG):
)
with open(AIRFLOW_CONFIG, 'w') as f:
cfg = parameterized_config(DEFAULT_CONFIG)
- f.write(cfg.split(TEMPLATE_START)[-1].strip())
+ cfg = cfg.split(TEMPLATE_START)[-1].strip()
+ if six.PY2:
+ cfg = cfg.encode('utf8')
+ f.write(cfg)
log.info("Reading the config from %s", AIRFLOW_CONFIG)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index cdce95f..17b2908 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -23,6 +23,7 @@ from uuid import uuid4
import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
+from airflow.configuration import conf
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
@@ -87,20 +88,6 @@ class KubeConfig:
core_section = 'core'
kubernetes_section = 'kubernetes'
- @staticmethod
- def safe_get(section, option, default):
- try:
- return configuration.get(section, option)
- except AirflowConfigException:
- return default
-
- @staticmethod
- def safe_getboolean(section, option, default):
- try:
- return configuration.getboolean(section, option)
- except AirflowConfigException:
- return default
-
def __init__(self):
configuration_dict = configuration.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict['core']
@@ -114,40 +101,37 @@ class KubeConfig:
self.kubernetes_section, 'worker_container_tag')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
- self.delete_worker_pods = self.safe_getboolean(
- self.kubernetes_section, 'delete_worker_pods', True)
+ self.delete_worker_pods = conf.getboolean(
+ self.kubernetes_section, 'delete_worker_pods')
- self.worker_service_account_name = self.safe_get(
- self.kubernetes_section, 'worker_service_account_name', 'default')
- self.image_pull_secrets = self.safe_get(
- self.kubernetes_section, 'image_pull_secrets', '')
+ self.worker_service_account_name = conf.get(
+ self.kubernetes_section, 'worker_service_account_name')
+ self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
# NOTE: `git_repo` and `git_branch` must be specified together as a pair
# The http URL of the git repository to clone from
- self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
+ self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
# The branch of the repository to be checked out
- self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
+ self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
# Optionally, the directory in the git repository containing the dags
- self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '')
+ self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')
# Optionally a user may supply a `git_user` and `git_password` for private
# repositories
- self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None)
- self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None)
+ self.git_user = conf.get(self.kubernetes_section, 'git_user')
+ self.git_password = conf.get(self.kubernetes_section, 'git_password')
# NOTE: The user may optionally use a volume claim to mount a PV containing
# DAGs directly
- self.dags_volume_claim = self.safe_get(self.kubernetes_section,
- 'dags_volume_claim', None)
+ self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')
# This prop may optionally be set for PV Claims and is used to write logs
- self.logs_volume_claim = self.safe_get(
- self.kubernetes_section, 'logs_volume_claim', None)
+ self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')
# This prop may optionally be set for PV Claims and is used to locate DAGs
# on a SubPath
- self.dags_volume_subpath = self.safe_get(
- self.kubernetes_section, 'dags_volume_subpath', None)
+ self.dags_volume_subpath = conf.get(
+ self.kubernetes_section, 'dags_volume_subpath')
# This prop may optionally be set for PV Claims and is used to write logs
self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
@@ -156,36 +140,32 @@ class KubeConfig:
# that if your
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
- self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace',
- 'default')
+ self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
# interact with cluster components.
- self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace',
- 'default')
+ self.executor_namespace = conf.get(self.kubernetes_section, 'namespace')
# Task secrets managed by KubernetesExecutor.
- self.gcp_service_account_keys = self.safe_get(
- self.kubernetes_section, 'gcp_service_account_keys', None)
+ self.gcp_service_account_keys = conf.get(self.kubernetes_section,
+ 'gcp_service_account_keys')
# If the user is using the git-sync container to clone their repository via git,
# allow them to specify repository, tag, and pod name for the init container.
- self.git_sync_container_repository = self.safe_get(
- self.kubernetes_section, 'git_sync_container_repository',
- 'gcr.io/google-containers/git-sync-amd64')
+ self.git_sync_container_repository = conf.get(
+ self.kubernetes_section, 'git_sync_container_repository')
- self.git_sync_container_tag = self.safe_get(
- self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
+ self.git_sync_container_tag = conf.get(
+ self.kubernetes_section, 'git_sync_container_tag')
self.git_sync_container = '{}:{}'.format(
self.git_sync_container_repository, self.git_sync_container_tag)
- self.git_sync_init_container_name = self.safe_get(
- self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone')
+ self.git_sync_init_container_name = conf.get(
+ self.kubernetes_section, 'git_sync_init_container_name')
# The worker pod may optionally have a valid Airflow config loaded via a
# configmap
- self.airflow_configmap = self.safe_get(self.kubernetes_section,
- 'airflow_configmap', None)
+ self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')
self._validate()
@@ -272,7 +252,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
elif status == 'Succeeded':
self.log.info('Event: %s Succeeded', pod_id)
- self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version))
+ self.watcher_queue.put((pod_id, None, labels, resource_version))
elif status == 'Running':
self.log.info('Event: %s is Running', pod_id)
else:
@@ -552,7 +532,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
# always need to reset resource version since we don't know
# when we last started, note for behavior below
- # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod
+ # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
+ # /CoreV1Api.md#list_namespaced_pod
KubeResourceVersion.reset_resource_version(self._session)
self.task_queue = Queue()
self.result_queue = Queue()
@@ -610,8 +591,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
task_id=task_id,
execution_date=ex_time
).one()
-
- if item.state == State.RUNNING or item.state == State.QUEUED:
+ if state:
item.state = state
self._session.add(item)
self._session.commit()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index d1a63a2..1d3cc9d 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+from airflow.configuration import conf
def _load_kube_config(in_cluster):
from kubernetes import config, client
@@ -26,6 +26,6 @@ def _load_kube_config(in_cluster):
return client.CoreV1Api()
-def get_kube_client(in_cluster=True):
+def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')):
# TODO: This should also allow people to point to a cluster.
return _load_kube_config(in_cluster)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index cd2cb9f..ac4dacf 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -21,13 +21,18 @@ import six
from airflow.contrib.kubernetes.pod import Pod, Resources
from airflow.contrib.kubernetes.secret import Secret
+from airflow.utils.log.logging_mixin import LoggingMixin
-class WorkerConfiguration:
+class WorkerConfiguration(LoggingMixin):
"""Contains Kubernetes Airflow Worker configuration logic"""
def __init__(self, kube_config):
self.kube_config = kube_config
+ self.worker_airflow_home = self.kube_config.airflow_home
+ self.worker_airflow_dags = self.kube_config.dags_folder
+ self.worker_airflow_logs = self.kube_config.base_log_folder
+ super(WorkerConfiguration, self).__init__()
def _get_init_containers(self, volume_mounts):
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
@@ -79,7 +84,7 @@ class WorkerConfiguration:
'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
}
if self.kube_config.airflow_configmap:
- env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+ env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
return env
def _get_secrets(self):
@@ -129,19 +134,19 @@ class WorkerConfiguration:
volume_mounts = [{
'name': dags_volume_name,
'mountPath': os.path.join(
- self.kube_config.dags_folder,
+ self.worker_airflow_dags,
self.kube_config.git_subpath
),
'readOnly': True
}, {
'name': logs_volume_name,
- 'mountPath': self.kube_config.base_log_folder
+ 'mountPath': self.worker_airflow_logs
}]
# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
- config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
+ config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
volumes.append({
'name': config_volume_name,
'configMap': {
@@ -172,6 +177,10 @@ class WorkerConfiguration:
annotations = {
'iam.cloud.google.com/service-account': gcp_sa_key
} if gcp_sa_key else {}
+ airflow_command = airflow_command.replace("-sd", "-i -sd")
+ airflow_path = airflow_command.split('-sd')[-1]
+ airflow_path = self.worker_airflow_home + airflow_path.split('/')[-1]
+ airflow_command = airflow_command.split('-sd')[0] + '-sd ' + airflow_path
return Pod(
namespace=namespace,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ecbfef8..e5a07b7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/airflow/www_rbac/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py
index 5bc0529..bddf0c1 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,8 @@ from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils import timezone
from airflow.www_rbac.app import csrf
+from airflow import models
+from airflow.utils.db import create_session
from flask import g, Blueprint, jsonify, request, url_for
@@ -112,6 +114,27 @@ def task_info(dag_id, task_id):
return jsonify(fields)
+@api_experimental.route('/dags/<string:dag_id>/paused/<string:paused>', methods=['GET'])
+@requires_authentication
+def dag_paused(dag_id, paused):
+ """(Un)pauses a dag"""
+
+ DagModel = models.DagModel
+ with create_session() as session:
+ orm_dag = (
+ session.query(DagModel)
+ .filter(DagModel.dag_id == dag_id).first()
+ )
+ if paused == 'true':
+ orm_dag.is_paused = True
+ else:
+ orm_dag.is_paused = False
+ session.merge(orm_dag)
+ session.commit()
+
+ return jsonify({'response': 'ok'})
+
+
@api_experimental.route(
'/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
methods=['GET'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index 967b698..6d2c62d 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \
unzip \
&& apt-get clean
+
+RUN pip install --upgrade pip
+
# Since we install vanilla Airflow, we also want to have support for Postgres and Kubernetes
RUN pip install -U setuptools && \
pip install kubernetes && \
@@ -43,6 +46,8 @@ RUN pip install -U setuptools && \
COPY airflow.tar.gz /tmp/airflow.tar.gz
RUN pip install /tmp/airflow.tar.gz
+COPY airflow-init.sh /tmp/airflow-init.sh
+
COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
new file mode 100755
index 0000000..dc33625
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/airflow-init.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License.
+
+cd /usr/local/lib/python2.7/dist-packages/airflow && \
+cp -R example_dags/* /root/airflow/dags/ && \
+airflow initdb && \
+alembic upgrade heads && \
+airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/docker/build.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh
index 6f14c4d..b93c6b1 100755
--- a/scripts/ci/kubernetes/docker/build.sh
+++ b/scripts/ci/kubernetes/docker/build.sh
@@ -27,7 +27,12 @@ if [ $? -eq 0 ]; then
eval $ENVCONFIG
fi
-cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \
-cd $DIRNAME && \
-docker build --pull $DIRNAME --tag=${IMAGE}:${TAG} && \
+echo "Airflow directory $AIRFLOW_ROOT"
+echo "Airflow Docker directory $DIRNAME"
+
+cd $AIRFLOW_ROOT
+python setup.py sdist -q
+echo "Copy distro $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz"
+cp $AIRFLOW_ROOT/dist/*.tar.gz ${DIRNAME}/airflow.tar.gz
+cd $DIRNAME && docker build --pull $DIRNAME --tag=${IMAGE}:${TAG}
rm $DIRNAME/airflow.tar.gz
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/airflow.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml
index 77566ae..09bbcd8 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml
+++ b/scripts/ci/kubernetes/kube/airflow.yaml
@@ -61,7 +61,7 @@ spec:
- "bash"
args:
- "-cx"
- - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads"
+ - "./tmp/airflow-init.sh"
containers:
- name: webserver
image: airflow
@@ -88,20 +88,20 @@ spec:
mountPath: /root/airflow/dags
- name: airflow-logs
mountPath: /root/airflow/logs
- readinessProbe:
- initialDelaySeconds: 5
- timeoutSeconds: 5
- periodSeconds: 5
- httpGet:
- path: /admin
- port: 8080
- livenessProbe:
- initialDelaySeconds: 5
- timeoutSeconds: 5
- failureThreshold: 5
- httpGet:
- path: /admin
- port: 8080
+# readinessProbe:
+# initialDelaySeconds: 5
+# timeoutSeconds: 5
+# periodSeconds: 5
+# httpGet:
+# path: /login
+# port: 8080
+# livenessProbe:
+# initialDelaySeconds: 5
+# timeoutSeconds: 5
+# failureThreshold: 5
+# httpGet:
+# path: /login
+# port: 8080
- name: scheduler
image: airflow
imagePullPolicy: IfNotPresent
@@ -146,76 +146,4 @@ spec:
nodePort: 30809
selector:
name: airflow
----
-apiVersion: v1
-kind: Secret
-metadata:
- name: airflow-secrets
-type: Opaque
-data:
- # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
- # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
- sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
----
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: airflow-configmap
-data:
- airflow.cfg: |
- [core]
- airflow_home = /root/airflow
- dags_folder = /root/airflow/dags
- base_log_folder = /root/airflow/logs
- logging_level = INFO
- executor = KubernetesExecutor
- parallelism = 32
- plugins_folder = /root/airflow/plugins
- sql_alchemy_conn = $SQL_ALCHEMY_CONN
-
- [scheduler]
- dag_dir_list_interval = 300
- child_process_log_directory = /root/airflow/logs/scheduler
- # Task instances listen for external kill signal (when you clear tasks
- # from the CLI or the UI), this defines the frequency at which they should
- # listen (in seconds).
- job_heartbeat_sec = 5
- max_threads = 16
-
- # The scheduler constantly tries to trigger new tasks (look at the
- # scheduler section in the docs for more information). This defines
- # how often the scheduler should run (in seconds).
- scheduler_heartbeat_sec = 5
-
- # after how much time should the scheduler terminate in seconds
- # -1 indicates to run continuously (see also num_runs)
- run_duration = -1
-
- # after how much time a new DAGs should be picked up from the filesystem
- min_file_process_interval = 0
-
- statsd_on = False
- statsd_host = localhost
- statsd_port = 8125
- statsd_prefix = airflow
-
- # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
- min_file_parsing_loop_time = 1
-
- print_stats_interval = 30
- scheduler_zombie_task_threshold = 300
- max_tis_per_query = 0
- authenticate = False
-
- [kubernetes]
- airflow_configmap = airflow-configmap
- worker_container_repository = airflow
- worker_container_tag = latest
- delete_worker_pods = True
- git_repo = https://github.com/grantnicholas/testdags.git
- git_branch = master
- dags_volume_claim = airflow-dags
- logs_volume_claim = airflow-logs
- [kubernetes_secrets]
- SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/configmaps.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
new file mode 100644
index 0000000..ddba098
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -0,0 +1,359 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: airflow-configmap
+data:
+ airflow.cfg: |
+ [core]
+ airflow_home = /root/airflow
+ dags_folder = /root/airflow/dags
+ base_log_folder = /root/airflow/logs
+ logging_level = INFO
+ executor = KubernetesExecutor
+ parallelism = 32
+ load_examples = True
+ plugins_folder = /root/airflow/plugins
+ sql_alchemy_conn = $SQL_ALCHEMY_CONN
+
+ [scheduler]
+ dag_dir_list_interval = 300
+ child_process_log_directory = /root/airflow/logs/scheduler
+ # Task instances listen for external kill signal (when you clear tasks
+ # from the CLI or the UI), this defines the frequency at which they should
+ # listen (in seconds).
+ job_heartbeat_sec = 5
+ max_threads = 2
+
+ # The scheduler constantly tries to trigger new tasks (look at the
+ # scheduler section in the docs for more information). This defines
+ # how often the scheduler should run (in seconds).
+ scheduler_heartbeat_sec = 5
+
+ # after how much time should the scheduler terminate in seconds
+ # -1 indicates to run continuously (see also num_runs)
+ run_duration = -1
+
+ # after how much time a new DAGs should be picked up from the filesystem
+ min_file_process_interval = 0
+
+ statsd_on = False
+ statsd_host = localhost
+ statsd_port = 8125
+ statsd_prefix = airflow
+
+ # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
+ min_file_parsing_loop_time = 1
+
+ print_stats_interval = 30
+ scheduler_zombie_task_threshold = 300
+ max_tis_per_query = 0
+ authenticate = False
+
+ # Turn off scheduler catchup by setting this to False.
+ # Default behavior is unchanged and
+ # Command Line Backfills still work, but the scheduler
+ # will not do scheduler catchup if this is False,
+ # however it can be set on a per DAG basis in the
+ # DAG definition (catchup)
+ catchup_by_default = True
+
+ [webserver]
+ # The base url of your website as airflow cannot guess what domain or
+ # cname you are using. This is used in automated emails that
+ # airflow sends to point links to the right web server
+ base_url = http://localhost:8080
+
+ # The ip specified when starting the web server
+ web_server_host = 0.0.0.0
+
+ # The port on which to run the web server
+ web_server_port = 8080
+
+ # Paths to the SSL certificate and key for the web server. When both are
+ # provided SSL will be enabled. This does not change the web server port.
+ web_server_ssl_cert =
+ web_server_ssl_key =
+
+ # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
+ web_server_master_timeout = 120
+
+ # Number of seconds the gunicorn webserver waits before timing out on a worker
+ web_server_worker_timeout = 120
+
+ # Number of workers to refresh at a time. When set to 0, worker refresh is
+ # disabled. When nonzero, airflow periodically refreshes webserver workers by
+ # bringing up new ones and killing old ones.
+ worker_refresh_batch_size = 1
+
+ # Number of seconds to wait before refreshing a batch of workers.
+ worker_refresh_interval = 30
+
+ # Secret key used to run your flask app
+ secret_key = temporary_key
+
+ # Number of workers to run the Gunicorn web server
+ workers = 4
+
+ # The worker class gunicorn should use. Choices include
+ # sync (default), eventlet, gevent
+ worker_class = sync
+
+ # Log files for the gunicorn webserver. '-' means log to stderr.
+ access_logfile = -
+ error_logfile = -
+
+ # Expose the configuration file in the web server
+ expose_config = False
+
+ # Set to true to turn on authentication:
+ # https://airflow.incubator.apache.org/security.html#web-authentication
+ authenticate = False
+
+ # Filter the list of dags by owner name (requires authentication to be enabled)
+ filter_by_owner = False
+
+ # Filtering mode. Choices include user (default) and ldapgroup.
+ # Ldap group filtering requires using the ldap backend
+ #
+ # Note that the ldap server needs the "memberOf" overlay to be set up
+ # in order to user the ldapgroup mode.
+ owner_mode = user
+
+ # Default DAG view. Valid values are:
+ # tree, graph, duration, gantt, landing_times
+ dag_default_view = tree
+
+ # Default DAG orientation. Valid values are:
+ # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
+ dag_orientation = LR
+
+ # Puts the webserver in demonstration mode; blurs the names of Operators for
+ # privacy.
+ demo_mode = False
+
+ # The amount of time (in secs) webserver will wait for initial handshake
+ # while fetching logs from other worker machine
+ log_fetch_timeout_sec = 5
+
+ # By default, the webserver shows paused DAGs. Flip this to hide paused
+ # DAGs by default
+ hide_paused_dags_by_default = False
+
+ # Consistent page size across all listing views in the UI
+ page_size = 100
+
+ # Use FAB-based webserver with RBAC feature
+ rbac = True
+
+ [smtp]
+ # If you want airflow to send emails on retries, failure, and you want to use
+ # the airflow.utils.email.send_email_smtp function, you have to configure an
+ # smtp server here
+ smtp_host = localhost
+ smtp_starttls = True
+ smtp_ssl = False
+ # Uncomment and set the user/pass settings if you want to use SMTP AUTH
+ # smtp_user = airflow
+ # smtp_password = airflow
+ smtp_port = 25
+ smtp_mail_from = airflow@example.com
+
+ [kubernetes]
+ airflow_configmap = airflow-configmap
+ worker_container_repository = airflow
+ worker_container_tag = latest
+ delete_worker_pods = True
+ git_repo = https://github.com/apache/incubator-airflow.git
+ git_branch = master
+ git_subpath = airflow/example_dags/
+ git_user =
+ git_password =
+ dags_volume_claim = airflow-dags
+ logs_volume_claim = airflow-logs
+ in_cluster = True
+ namespace = default
+ gcp_service_account_keys =
+
+ # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
+ git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
+ git_sync_container_tag = v2.0.5
+ git_sync_init_container_name = git-sync-clone
+
+ [kubernetes_secrets]
+ SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
+
+ [hive]
+ # Default mapreduce queue for HiveOperator tasks
+ default_hive_mapred_queue =
+
+ [celery]
+ # This section only applies if you are using the CeleryExecutor in
+ # [core] section above
+
+ # The app name that will be used by celery
+ celery_app_name = airflow.executors.celery_executor
+
+ # The concurrency that will be used when starting workers with the
+ # "airflow worker" command. This defines the number of task instances that
+ # a worker will take, so size up your workers based on the resources on
+ # your worker box and the nature of your tasks
+ worker_concurrency = 16
+
+ # When you start an airflow worker, airflow starts a tiny web server
+ # subprocess to serve the workers local log files to the airflow main
+ # web server, who then builds pages and sends them to users. This defines
+ # the port on which the logs are served. It needs to be unused, and open
+ # visible from the main web server to connect into the workers.
+ worker_log_server_port = 8793
+
+ # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
+ # a sqlalchemy database. Refer to the Celery documentation for more
+ # information.
+ # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
+ broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
+
+ # The Celery result_backend. When a job finishes, it needs to update the
+ # metadata of the job. Therefore it will post a message on a message bus,
+ # or insert it into a database (depending of the backend)
+ # This status is used by the scheduler to update the state of the task
+ # The use of a database is highly recommended
+ # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
+ result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+
+ # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
+ # it `airflow flower`. This defines the IP that Celery Flower runs on
+ flower_host = 0.0.0.0
+
+ # The root URL for Flower
+ # Ex: flower_url_prefix = /flower
+ flower_url_prefix =
+
+ # This defines the port that Celery Flower runs on
+ flower_port = 5555
+
+ # Default queue that tasks get assigned to and that worker listen on.
+ default_queue = default
+
+ # Import path for celery configuration options
+ celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
+
+ [celery_broker_transport_options]
+ # The visibility timeout defines the number of seconds to wait for the worker
+ # to acknowledge the task before the message is redelivered to another worker.
+ # Make sure to increase the visibility timeout to match the time of the longest
+ # ETA you're planning to use. Especially important in case of using Redis or SQS
+ visibility_timeout = 21600
+
+ # In case of using SSL
+ ssl_active = False
+ ssl_key =
+ ssl_cert =
+ ssl_cacert =
+
+ [dask]
+ # This section only applies if you are using the DaskExecutor in
+ # [core] section above
+
+ # The IP address and port of the Dask cluster's scheduler.
+ cluster_address = 127.0.0.1:8786
+ # TLS/ SSL settings to access a secured Dask scheduler.
+ tls_ca =
+ tls_cert =
+ tls_key =
+
+ [ldap]
+ # set this to ldaps://<your.ldap.server>:<port>
+ uri =
+ user_filter = objectClass=*
+ user_name_attr = uid
+ group_member_attr = memberOf
+ superuser_filter =
+ data_profiler_filter =
+ bind_user = cn=Manager,dc=example,dc=com
+ bind_password = insecure
+ basedn = dc=example,dc=com
+ cacert = /etc/ca/ldap_ca.crt
+ search_scope = LEVEL
+
+ [mesos]
+ # Mesos master address which MesosExecutor will connect to.
+ master = localhost:5050
+
+ # The framework name which Airflow scheduler will register itself as on mesos
+ framework_name = Airflow
+
+ # Number of cpu cores required for running one task instance using
+ # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+ # command on a mesos slave
+ task_cpu = 1
+
+ # Memory in MB required for running one task instance using
+ # 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+ # command on a mesos slave
+ task_memory = 256
+
+ # Enable framework checkpointing for mesos
+ # See http://mesos.apache.org/documentation/latest/slave-recovery/
+ checkpoint = False
+
+ # Failover timeout in milliseconds.
+ # When checkpointing is enabled and this option is set, Mesos waits
+ # until the configured timeout for
+ # the MesosExecutor framework to re-register after a failover. Mesos
+ # shuts down running tasks if the
+ # MesosExecutor framework fails to re-register within this timeframe.
+ # failover_timeout = 604800
+
+ # Enable framework authentication for mesos
+ # See http://mesos.apache.org/documentation/latest/configuration/
+ authenticate = False
+
+ # Mesos credentials, if authentication is enabled
+ # default_principal = admin
+ # default_secret = admin
+
+ # Optional Docker Image to run on slave before running the command
+ # This image should be accessible from mesos slave i.e mesos slave
+ # should be able to pull this docker image before executing the command.
+ # docker_image_slave = puckel/docker-airflow
+
+ [kerberos]
+ ccache = /tmp/airflow_krb5_ccache
+ # gets augmented with fqdn
+ principal = airflow
+ reinit_frequency = 3600
+ kinit_path = kinit
+ keytab = airflow.keytab
+
+ [cli]
+ api_client = airflow.api.client.json_client
+ endpoint_url = http://localhost:8080
+
+ [api]
+ auth_backend = airflow.api.auth.backend.default
+
+ [github_enterprise]
+ api_rev = v3
+
+ [admin]
+ # UI to hide sensitive variable fields when set to True
+ hide_sensitive_variable_fields = True
+
+ [elasticsearch]
+ elasticsearch_host =
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh
index a5adcf8..e585d87 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -21,8 +21,14 @@ IMAGE=${1:-airflow/ci}
TAG=${2:-latest}
DIRNAME=$(cd "$(dirname "$0")"; pwd)
+kubectl delete -f $DIRNAME/postgres.yaml
+kubectl delete -f $DIRNAME/airflow.yaml
+kubectl delete -f $DIRNAME/secrets.yaml
+
kubectl apply -f $DIRNAME/postgres.yaml
kubectl apply -f $DIRNAME/volumes.yaml
+kubectl apply -f $DIRNAME/secrets.yaml
+kubectl apply -f $DIRNAME/configmaps.yaml
kubectl apply -f $DIRNAME/airflow.yaml
# wait for up to 10 minutes for everything to be deployed
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml
index 67a0635..1130921 100644
--- a/scripts/ci/kubernetes/kube/postgres.yaml
+++ b/scripts/ci/kubernetes/kube/postgres.yaml
@@ -30,6 +30,7 @@ spec:
containers:
- name: postgres
image: postgres
+ imagePullPolicy: IfNotPresent
ports:
- containerPort: 5432
protocol: TCP
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/secrets.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/secrets.yaml b/scripts/ci/kubernetes/kube/secrets.yaml
new file mode 100644
index 0000000..4d533b3
--- /dev/null
+++ b/scripts/ci/kubernetes/kube/secrets.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License. *
+apiVersion: v1
+kind: Secret
+metadata:
+ name: airflow-secrets
+type: Opaque
+data:
+ # The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
+ # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+ sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/kubernetes/kube/volumes.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml
index 073e98c..58ad368 100644
--- a/scripts/ci/kubernetes/kube/volumes.yaml
+++ b/scripts/ci/kubernetes/kube/volumes.yaml
@@ -62,3 +62,4 @@ spec:
resources:
requests:
storage: 2Gi
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 8766e94..52571cc 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -26,7 +26,7 @@ then
tox -e $TOX_ENV
else
KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
- tox -e $TOX_ENV -- tests.contrib.minikube_tests \
+ tox -e $TOX_ENV -- tests.contrib.minikube \
--with-coverage \
--cover-erase \
--cover-html \
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/setup.cfg
----------------------------------------------------------------------
diff --git a/setup.cfg b/setup.cfg
index 7b3479c..622cc13 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -6,16 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
[metadata]
name = Airflow
summary = Airflow is a system to programmatically author, schedule and monitor data pipelines.
@@ -29,8 +27,11 @@ packages = airflow
[build_sphinx]
source-dir = docs/
-build-dir = docs/_build
-all_files = 1
+build-dir = docs/_build
+all_files = 1
[upload_sphinx]
upload-dir = docs/_build/html
+
+[easy_install]
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/cli/test_cli.py
----------------------------------------------------------------------
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 9c79567..34c82bc 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,15 +22,84 @@ import unittest
from mock import patch, Mock, MagicMock
from time import sleep
-
import psutil
-
+from argparse import Namespace
from airflow import settings
-from airflow.bin.cli import get_num_ready_workers_running
+from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.models import TaskInstance
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.settings import Session
+from airflow import models
+
+import os
+
+dag_folder_path = '/'.join(os.path.realpath(__file__).split('/')[:-1])
+
+TEST_DAG_FOLDER = os.path.join(
+ os.path.dirname(dag_folder_path), 'dags')
+TEST_DAG_ID = 'unit_tests'
+
+
+def reset(dag_id):
+ session = Session()
+ tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+ tis.delete()
+ session.commit()
+ session.close()
+
+
+def create_mock_args(
+ task_id,
+ dag_id,
+ subdir,
+ execution_date,
+ task_params=None,
+ dry_run=False,
+ queue=None,
+ pool=None,
+ priority_weight_total=None,
+ retries=0,
+ local=True,
+ mark_success=False,
+ ignore_all_dependencies=False,
+ ignore_depends_on_past=False,
+ ignore_dependencies=False,
+ force=False,
+ run_as_user=None,
+ executor_config={},
+ cfg_path=None,
+ pickle=None,
+ raw=None,
+ interactive=None,
+):
+ args = MagicMock(spec=Namespace)
+ args.task_id = task_id
+ args.dag_id = dag_id
+ args.subdir = subdir
+ args.task_params = task_params
+ args.execution_date = execution_date
+ args.dry_run = dry_run
+ args.queue = queue
+ args.pool = pool
+ args.priority_weight_total = priority_weight_total
+ args.retries = retries
+ args.local = local
+ args.run_as_user = run_as_user
+ args.executor_config = executor_config
+ args.cfg_path = cfg_path
+ args.pickle = pickle
+ args.raw = raw
+ args.mark_success = mark_success
+ args.ignore_all_dependencies = ignore_all_dependencies
+ args.ignore_depends_on_past = ignore_depends_on_past
+ args.ignore_dependencies = ignore_dependencies
+ args.force = force
+ args.interactive = interactive
+ return args
class TestCLI(unittest.TestCase):
-
def setUp(self):
self.gunicorn_master_proc = Mock(pid=None)
self.children = MagicMock()
@@ -74,3 +143,23 @@ class TestCLI(unittest.TestCase):
"webserver terminated with return code {} in debug mode".format(return_code))
p.terminate()
p.wait()
+
+ def test_local_run(self):
+ args = create_mock_args(
+ task_id='print_the_context',
+ dag_id='example_python_operator',
+ subdir='/root/dags/example_python_operator.py',
+ interactive=True,
+ execution_date=timezone.parse('2018-04-27T08:39:51.298439+00:00')
+ )
+
+ reset(args.dag_id)
+
+ with patch('argparse.Namespace', args) as mock_args:
+ run(mock_args)
+ dag = get_dag(mock_args)
+ task = dag.get_task(task_id=args.task_id)
+ ti = TaskInstance(task, args.execution_date)
+ ti.refresh_from_db()
+ state = ti.current_state()
+ self.assertEqual(state, State.SUCCESS)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py
deleted file mode 100644
index 759b563..0000000
--- a/tests/contrib/kubernetes/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/__init__.py b/tests/contrib/minikube/__init__.py
new file mode 100644
index 0000000..114d189
--- /dev/null
+++ b/tests/contrib/minikube/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py
new file mode 100644
index 0000000..9827bc8
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from subprocess import check_call, check_output
+
+import requests
+import time
+import six
+
+try:
+ check_call(["kubectl", "get", "pods"])
+except Exception as e:
+ raise unittest.SkipTest(
+ "Kubernetes integration tests require a minikube cluster;"
+ "Skipping tests {}".format(e)
+ )
+
+
+class KubernetesExecutorTest(unittest.TestCase):
+
+ def test_integration_run_dag(self):
+ host_ip = check_output(['minikube', 'ip'])
+ if six.PY3:
+ host_ip = host_ip.decode('UTF-8')
+ host = '{}:30809'.format(host_ip.strip())
+
+ # Enable the dag
+ result = requests.get(
+ 'http://{}/api/experimental/'
+ 'dags/example_python_operator/paused/false'.format(host)
+ )
+ self.assertEqual(result.status_code, 200, "Could not enable DAG")
+
+ # Trigger a new dagrun
+ result = requests.post(
+ 'http://{}/api/experimental/'
+ 'dags/example_python_operator/dag_runs'.format(host),
+ json={}
+ )
+ self.assertEqual(result.status_code, 200, "Could not trigger a DAG-run")
+
+ time.sleep(1)
+
+ result = requests.get(
+ 'http://{}/api/experimental/latest_runs'.format(host)
+ )
+ self.assertEqual(result.status_code, 200, "Could not get the latest DAG-run")
+ result_json = result.json()
+
+ self.assertGreater(len(result_json['items']), 0)
+
+ execution_date = result_json['items'][0]['execution_date']
+ print("Found the job with execution date {}".format(execution_date))
+
+ tries = 0
+ state = ''
+ # Wait 100 seconds for the operator to complete
+ while tries < 20:
+ time.sleep(5)
+
+ # Trigger a new dagrun
+ result = requests.get(
+ 'http://{}/api/experimental/dags/example_python_operator/'
+ 'dag_runs/{}/tasks/print_the_context'.format(host, execution_date)
+ )
+ self.assertEqual(result.status_code, 200, "Could not get the status")
+ result_json = result.json()
+ state = result_json['state']
+ print("Attempt {}: Current state of operator is {}".format(tries, state))
+
+ if state == 'success':
+ break
+ tries += 1
+
+ self.assertEqual(state, 'success')
+
+ # Maybe check if we can retrieve the logs, but then we need to extend the API
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..081fc04
--- /dev/null
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+import mock
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+
+try:
+ check_call(["kubectl", "get", "pods"])
+except Exception as e:
+ raise unittest.SkipTest(
+ "Kubernetes integration tests require a minikube cluster;"
+ "Skipping tests {}".format(e)
+ )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+ def test_working_pod(self):
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+ k.execute(None)
+
+ def test_logging(self):
+ with mock.patch.object(PodLauncher, 'log') as mock_logger:
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ get_logs=True
+ )
+ k.execute(None)
+ mock_logger.info.assert_any_call(b"+ echo\n")
+
+ def test_faulty_image(self):
+ bad_image_name = "foobar"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image=bad_image_name,
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5
+ )
+ with self.assertRaises(AirflowException) as cm:
+ k.execute(None),
+
+ print("exception: {}".format(cm))
+
+ def test_pod_failure(self):
+ """
+ Tests that the task fails when a pod reports a failure
+ """
+ bad_internal_command = "foobar"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=[bad_internal_command, "10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task"
+ )
+ with self.assertRaises(AirflowException):
+ k.execute(None)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
deleted file mode 100644
index 4067cc7..0000000
--- a/tests/contrib/minikube_tests/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/__init__.py b/tests/contrib/minikube_tests/integration/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/minikube_tests/integration/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py b/tests/contrib/minikube_tests/integration/airflow_controller.py
deleted file mode 100644
index 5604652..0000000
--- a/tests/contrib/minikube_tests/integration/airflow_controller.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import subprocess
-import time
-
-
-class RunCommandError(Exception):
- pass
-
-
-class TimeoutError(Exception):
- pass
-
-
-class DagRunState:
- SUCCESS = "success"
- FAILED = "failed"
- RUNNING = "running"
-
-
-def run_command(command):
- process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = process.communicate()
- if process.returncode != 0:
- raise RunCommandError(
- "Error while running command: {}; Stdout: {}; Stderr: {}".format(
- command, stdout, stderr
- ))
- return stdout, stderr
-
-
-def run_command_in_pod(pod_name, container_name, command):
- return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format(
- pod_name=pod_name, container_name=container_name, command=command
- ))
-
-
-def get_scheduler_logs(airflow_pod=None):
- airflow_pod = airflow_pod or _get_airflow_pod()
-
- return run_command("kubectl logs {pod_name} scheduler"
- .format(pod_name=airflow_pod))
-
-
-def _unpause_dag(dag_id, airflow_pod=None):
- airflow_pod = airflow_pod or _get_airflow_pod()
- return run_command_in_pod(airflow_pod, "scheduler",
- "airflow unpause {dag_id}".format(dag_id=dag_id))
-
-
-def run_dag(dag_id, run_id, airflow_pod=None):
- airflow_pod = airflow_pod or _get_airflow_pod()
- _unpause_dag(dag_id, airflow_pod)
- return run_command_in_pod(airflow_pod, "scheduler",
- "airflow trigger_dag {dag_id} -r {run_id}".format(
- dag_id=dag_id, run_id=run_id
- ))
-
-
-def _get_pod_by_grep(grep_phrase):
- stdout, stderr = run_command(
- "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format(
- grep_phrase=grep_phrase
- ))
- pod_name = stdout.strip()
- return pod_name
-
-
-def _get_airflow_pod():
- return _get_pod_by_grep("^airflow")
-
-
-def _get_postgres_pod():
- return _get_pod_by_grep("^postgres")
-
-
-def _parse_state(stdout):
- end_line = "(1 row)"
- prev_line = None
- for line in stdout.split("\n"):
- if end_line in line:
- return prev_line.strip()
- prev_line = line
-
- raise Exception("Unknown psql output: {}".format(stdout))
-
-
-def get_dag_run_table(postgres_pod=None):
- postgres_pod = postgres_pod or _get_postgres_pod()
- stdout, stderr = run_command_in_pod(
- postgres_pod, "postgres",
- """psql airflow -c "select * from dag_run" """
- )
- return stdout
-
-
-def get_task_instance_table(postgres_pod=None):
- postgres_pod = postgres_pod or _get_postgres_pod()
- stdout, stderr = run_command_in_pod(
- postgres_pod, "postgres",
- """psql airflow -c "select * from task_instance" """
- )
- return stdout
-
-
-def get_dag_run_state(dag_id, run_id, postgres_pod=None):
- postgres_pod = postgres_pod or _get_postgres_pod()
- stdout, stderr = run_command_in_pod(
- postgres_pod, "postgres",
- """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and
- run_id='{run_id}'" """.format(
- dag_id=dag_id, run_id=run_id
- )
- )
- return _parse_state(stdout)
-
-
-def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120):
- postgres_pod = postgres_pod or _get_postgres_pod()
- for _ in range(0, timeout / poll_interval):
- dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
- if dag_state != DagRunState.RUNNING:
- capture_logs_for_failure(dag_state)
- return dag_state
- time.sleep(poll_interval)
-
- raise TimeoutError(
- "Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id,
- run_id))
-
-
-def _kill_pod(pod_name):
- return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name))
-
-
-def kill_scheduler():
- airflow_pod = _get_pod_by_grep("^airflow")
- return _kill_pod(airflow_pod)
-
-
-def capture_logs_for_failure(state):
- if state != DagRunState.SUCCESS:
- stdout, stderr = get_scheduler_logs()
- print("stdout:")
- for line in stdout.split('\n'):
- print(line)
- print("stderr:")
- for line in stderr.split('\n'):
- print(line)
- print("dag_run:")
- print(get_dag_run_table())
- print("task_instance")
- print(get_task_instance_table())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
deleted file mode 100644
index 602a717..0000000
--- a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import unittest
-from uuid import uuid4
-
-from tests.contrib.minikube_tests.integration.airflow_controller\
- import DagRunState, RunCommandError, \
- dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag
-
-try:
- run_command("kubectl get pods")
-except RunCommandError:
- SKIP_KUBE = True
-else:
- SKIP_KUBE = False
-
-
-class KubernetesExecutorTest(unittest.TestCase):
- @unittest.skipIf(SKIP_KUBE,
- 'Kubernetes integration tests are unsupported by this configuration')
- def test_kubernetes_executor_dag_runs_successfully(self):
- dag_id, run_id = "example_python_operator", uuid4().hex
- run_dag(dag_id, run_id)
- state = dag_final_state(dag_id, run_id, timeout=120)
- self.assertEquals(state, DagRunState.SUCCESS)
-
- @unittest.skipIf(SKIP_KUBE,
- 'Kubernetes integration tests are unsupported by this configuration')
- def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
- dag_id, run_id = "example_python_operator", uuid4().hex
- run_dag(dag_id, run_id)
-
- self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
-
- time.sleep(10)
-
- kill_scheduler()
-
- self.assertEquals(dag_final_state(dag_id, run_id, timeout=180),
- DagRunState.SUCCESS)
-
- @unittest.skipIf(SKIP_KUBE,
- 'Kubernetes integration tests are unsupported by this configuration')
- def test_kubernetes_executor_config_works(self):
- dag_id, run_id = "example_kubernetes_executor", uuid4().hex
- run_dag(dag_id, run_id)
-
- self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING)
- self.assertEquals(dag_final_state(dag_id, run_id, timeout=300),
- DagRunState.SUCCESS)
-
-
-if __name__ == "__main__":
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
deleted file mode 100644
index 321f01f..0000000
--- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-from subprocess import check_call
-import mock
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-
-try:
- check_call(["kubectl", "get", "pods"])
-except Exception as e:
- raise unittest.SkipTest(
- "Kubernetes integration tests require a minikube cluster;"
- "Skipping tests {}".format(e)
- )
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
- def test_working_pod(self):
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- k.execute(None)
-
- def test_logging(self):
- with mock.patch.object(PodLauncher, 'log') as mock_logger:
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- get_logs=True
- )
- k.execute(None)
- mock_logger.info.assert_any_call("+ echo\n")
-
- def test_faulty_image(self):
- bad_image_name = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image=bad_image_name,
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- startup_timeout_seconds=5
- )
- with self.assertRaises(AirflowException) as cm:
- k.execute(None),
-
- print("exception: {}".format(cm))
-
- def test_pod_failure(self):
- """
- Tests that the task fails when a pod reports a failure
- """
-
- bad_internal_command = "foobar"
- k = KubernetesPodOperator(namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=[bad_internal_command, "10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task"
- )
-
- with self.assertRaises(AirflowException):
- k.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16bae563/tests/www_rbac/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/api/experimental/test_endpoints.py b/tests/www_rbac/api/experimental/test_endpoints.py
index 7bcbb8e..a19492e 100644
--- a/tests/www_rbac/api/experimental/test_endpoints.py
+++ b/tests/www_rbac/api/experimental/test_endpoints.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -77,6 +77,23 @@ class TestApiExperimental(unittest.TestCase):
self.assertIn('error', response.data.decode('utf-8'))
self.assertEqual(404, response.status_code)
+ def test_task_paused(self):
+ url_template = '/api/experimental/dags/{}/paused/{}'
+
+ response = self.app.get(
+ url_template.format('example_bash_operator', 'true')
+ )
+ self.assertIn('ok', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
+ url_template = '/api/experimental/dags/{}/paused/{}'
+
+ response = self.app.get(
+ url_template.format('example_bash_operator', 'false')
+ )
+ self.assertIn('ok', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
def test_trigger_dag(self):
url_template = '/api/experimental/dags/{}/dag_runs'
response = self.app.post(