You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/12 18:03:08 UTC

[08/16] incubator-airflow git commit: [AIRFLOW-1517] Add minikube for kubernetes integration tests

[AIRFLOW-1517] Add minikube for kubernetes integration tests

Add better support for minikube integration tests; By default minikube integration tests will run with kubernetes 1.7 and kubernetes 1.8


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/965439be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/965439be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/965439be

Branch: refs/heads/master
Commit: 965439bef04de4744796c8516ad8ff7e548639e5
Parents: a42dbb4
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Wed Dec 27 14:21:20 2017 -0600
Committer: Daniel Imberman <da...@gmail.com>
Committed: Thu Jan 11 15:29:16 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  9 +++
 airflow/contrib/kubernetes/kube_client.py       |  8 +-
 .../ci/kubernetes/minikube/start_minikube.sh    | 58 +++++++++++----
 scripts/ci/run_tests.sh                         |  2 +-
 scripts/ci/travis_script.sh                     |  7 +-
 tests/contrib/minikube_tests/__init__.py        | 13 ++++
 .../test_kubernetes_pod_operator.py             | 78 ++++++++++++++++++++
 .../operators/test_kubernetes_pod_operator.py   | 69 -----------------
 8 files changed, 150 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6b45153..dec9181 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -54,6 +54,8 @@ env:
     - TOX_ENV=py35-backend_sqlite
     - TOX_ENV=py35-backend_postgres
     - TOX_ENV=flake8
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
 matrix:
   exclude:
     - python: "3.5"
@@ -70,6 +72,13 @@ matrix:
       env: TOX_ENV=py35-backend_postgres
     - python: "2.7"
       env: TOX_ENV=flake8
+    - python: "3.5"  
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - python: "3.5"
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 
+  allow_failures:
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0  
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py
index cd68caf..ecb3d55 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -16,17 +16,15 @@
 # under the License.
 
 
-def load_kube_config(in_cluster=True):
+def _load_kube_config(in_cluster):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
+        return client.CoreV1Api()
     else:
         config.load_kube_config()
         return client.CoreV1Api()
 
 def get_kube_client(in_cluster=True):
     # TODO: This should also allow people to point to a cluster.
-
-    from kubernetes import client
-    load_kube_config(in_cluster)
-    return client.CoreV1Api()
+    return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh
index f78cb3a..1da23d0 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -15,8 +15,8 @@
 #  specific language governing permissions and limitations      *
 #  under the License.                                           *
 
-# Guard against a kubernetes cluster already being up
 #!/usr/bin/env bash
+# Guard against a kubernetes cluster already being up
 kubectl get pods &> /dev/null
 if [ $? -eq 0 ]; then
   echo "kubectl get pods returned 0 exit code, exiting early"
@@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube
-curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl
+curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube
+curl -Lo kubectl  https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin
 sudo mv minikube /usr/local/bin/minikube
@@ -39,15 +39,43 @@ mkdir $HOME/.kube || true
 touch $HOME/.kube/config
 
 export KUBECONFIG=$HOME/.kube/config
-sudo -E minikube start --vm-driver=none
-
-# this for loop waits until kubectl can access the api server that minikube has created
-for i in {1..150} # timeout for 5 minutes
-do
-  echo "------- Running kubectl get pods -------"
-  kubectl get po &> /dev/null
-  if [ $? -ne 1 ]; then
-    break
-  fi
-  sleep 2
-done
+
+start_minikube(){
+  sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}"
+
+  # this for loop waits until kubectl can access the api server that minikube has created
+  for i in {1..90} # timeout 3 minutes
+  do
+    echo "------- Running kubectl get pods -------"
+    STDERR=$(kubectl get pods  2>&1 >/dev/null)
+    if [ $? -ne 1 ]; then
+      echo $STDERR
+
+      # We do not need dynamic hostpath provisioning, so disable the default storageclass
+      sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all
+
+      # We need to give permission to watch pods to the airflow scheduler. 
+      # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!)
+      kubectl create clusterrolebinding add-on-cluster-admin   --clusterrole=cluster-admin   --serviceaccount=default:default
+      exit 0
+    fi
+    echo $STDERR
+    sleep 2
+  done
+}
+
+cleanup_minikube(){
+  sudo -E minikube stop
+  sudo -E minikube delete
+  docker stop $(docker ps -a -q) || true
+  docker rm $(docker ps -a -q) || true
+  sleep 1
+}
+
+start_minikube
+echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster."
+cleanup_minikube
+start_minikube
+echo "Minikube cluster creation timedout a second time. Failing."
+
+exit 1

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh
index 1253686..8c47ee8 100755
--- a/scripts/ci/run_tests.sh
+++ b/scripts/ci/run_tests.sh
@@ -44,5 +44,5 @@ fi
 
 if [[ "$SKIP_TESTS" != "true" ]]; then
     echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
-    ./run_unit_tests.sh
+    ./run_unit_tests.sh $@
 fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index a51e742..86c086a 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -19,13 +19,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
 AIRFLOW_ROOT="$DIRNAME/../.."
 cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
 
-if [ -z "$RUN_KUBE_INTEGRATION" ];
+if [ -z "$KUBERNETES_VERSION" ];
 then
-  $DIRNAME/kubernetes/setup_kubernetes.sh
   tox -e $TOX_ENV
 else
-  $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.executors.integration \
+  KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/minikube_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/minikube_tests/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..5cdd819
--- /dev/null
+++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
+from airflow import AirflowException
+from subprocess import check_call
+
+
+try:
+    check_call(["kubectl", "get", "pods"])
+except:
+    raise unittest.SkipTest(
+        "Kubernetes integration tests require a minikube cluster; Skipping tests"
+    )
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_working_pod(self):
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        k.execute(None)
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image=bad_image_name,
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task",
+                                  startup_timeout_seconds=5
+                                  )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=[bad_internal_command, "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        with self.assertRaises(AirflowException):
+            k.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/965439be/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py
deleted file mode 100644
index 205f183..0000000
--- a/tests/contrib/operators/test_kubernetes_pod_operator.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
-from airflow import AirflowException
-
-
-class KubernetesPodOperatorTest(unittest.TestCase):
-
-    def test_working_pod(self):
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        k.execute(None)
-
-    def test_faulty_image(self):
-        bad_image_name = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image=bad_image_name,
-                                  cmds=["bash", "-cx"],
-                                  arguments=["echo", "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task",
-                                  startup_timeout_seconds=5
-                                  )
-        with self.assertRaises(AirflowException) as cm:
-            k.execute(None),
-
-        print("exception: {}".format(cm))
-
-    def test_pod_failure(self):
-        """
-            Tests that the task fails when a pod reports a failure
-        """
-
-        bad_internal_command = "foobar"
-        k = KubernetesPodOperator(namespace='default',
-                                  image="ubuntu:16.04",
-                                  cmds=["bash", "-cx"],
-                                  arguments=[bad_internal_command, "10"],
-                                  labels={"foo": "bar"},
-                                  name="test",
-                                  task_id="task"
-                                  )
-
-        with self.assertRaises(AirflowException):
-            k.execute(None)