You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/14 19:58:59 UTC

incubator-airflow git commit: [AIRFLOW-2460] Users can now use volume mounts and volumes when launching pods using k8s operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6c19468e0 -> 0b6a7000c


[AIRFLOW-2460] Users can now use volume mounts and volumes when
launching pods using k8s operator

Closes #3356 from dimberman/k8s-mounts


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0b6a7000
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0b6a7000
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0b6a7000

Branch: refs/heads/master
Commit: 0b6a7000c063d8bf744a1b907f4e1d7c04f57e57
Parents: 6c19468
Author: Daniel Imberman <da...@gmail.com>
Authored: Mon May 14 21:58:39 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon May 14 21:58:39 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/kubernetes/pod_generator.py     | 43 ++++++++++++++++----
 airflow/contrib/kubernetes/volume.py            | 33 +++++++++++++++
 airflow/contrib/kubernetes/volume_mount.py      | 37 +++++++++++++++++
 .../operators/kubernetes_pod_operator.py        | 24 +++++++++--
 docs/kubernetes.rst                             | 15 ++++++-
 scripts/ci/kubernetes/docker/Dockerfile         |  2 +-
 scripts/ci/kubernetes/docker/airflow-init.sh    | 24 -----------
 .../kubernetes/docker/airflow-test-env-init.sh  | 25 ++++++++++++
 scripts/ci/kubernetes/kube/airflow.yaml         |  7 +++-
 scripts/ci/kubernetes/kube/volumes.yaml         | 24 ++++++++++-
 .../minikube/test_kubernetes_pod_operator.py    | 40 +++++++++++++++---
 11 files changed, 230 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 78d3926..0f9dabd 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -19,6 +19,8 @@ import os
 
 from airflow.contrib.kubernetes.pod import Pod
 import uuid
+from airflow.contrib.kubernetes.volume_mount import VolumeMount  # noqa
+from airflow.contrib.kubernetes.volume import Volume  # noqa
 
 
 class PodGenerator:
@@ -64,16 +66,30 @@ class PodGenerator:
     def _get_init_containers(self):
         return self.init_containers
 
-    def add_volume(self, name):
+    def add_volume(self, volume):
+        """
+        Args:
+            volume (Volume):
+        """
+
+        self._add_volume(name=volume.name, configs=volume.configs)
+
+    def _add_volume(self, name, configs):
         """
 
         Args:
             name (str):
+            configs (dict): Configurations for the volume.
+            Could be used to define PersistentVolumeClaim, ConfigMap, etc...
 
         Returns:
 
         """
-        self.volumes.append({'name': name})
+        volume_map = {'name': name}
+        for k, v in configs.items():
+            volume_map[k] = v
+
+        self.volumes.append(volume_map)
 
     def add_volume_with_configmap(self, name, config_map):
         self.volumes.append(
@@ -83,11 +99,11 @@ class PodGenerator:
             }
         )
 
-    def add_mount(self,
-                  name,
-                  mount_path,
-                  sub_path,
-                  read_only):
+    def _add_mount(self,
+                   name,
+                   mount_path,
+                   sub_path,
+                   read_only):
         """
 
         Args:
@@ -107,6 +123,19 @@ class PodGenerator:
             'readOnly': read_only
         })
 
+    def add_mount(self,
+                  volume_mount):
+        """
+        Args:
+            volume_mount (VolumeMount):
+        """
+        self._add_mount(
+            name=volume_mount.name,
+            mount_path=volume_mount.mount_path,
+            sub_path=volume_mount.sub_path,
+            read_only=volume_mount.read_only
+        )
+
     def _get_volumes_and_mounts(self):
         return self.volumes, self.volume_mounts
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/kubernetes/volume.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/volume.py b/airflow/contrib/kubernetes/volume.py
new file mode 100644
index 0000000..d5b4f60
--- /dev/null
+++ b/airflow/contrib/kubernetes/volume.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+class Volume:
+    """Defines Kubernetes Volume"""
+
+    def __init__(self, name, configs):
+        """ Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps
+        and Persistent Volumes
+        :param name: the name of the volume mount
+        :type: name: str
+        :param configs: dictionary of any features needed for volume.
+        We purposely keep this vague since there are multiple volume types with changing
+        configs.
+        :type: configs: dict
+        """
+        self.name = name
+        self.configs = configs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/kubernetes/volume_mount.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/volume_mount.py b/airflow/contrib/kubernetes/volume_mount.py
new file mode 100644
index 0000000..4bdf09c
--- /dev/null
+++ b/airflow/contrib/kubernetes/volume_mount.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+class VolumeMount:
+    """Defines Kubernetes Volume Mount"""
+
+    def __init__(self, name, mount_path, sub_path, read_only):
+        """Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to
+        running container.
+        :param name: the name of the volume mount
+        :type name: str
+        :param mount_path:
+        :type mount_path: str
+        :param sub_path: subpath within the volume mount
+        :type sub_path: str
+        :param read_only: whether to access pod with read-only mode
+        :type read_only: bool
+        """
+        self.name = name
+        self.mount_path = mount_path
+        self.sub_path = sub_path
+        self.read_only = read_only

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 0f9c943..ffdc2cf 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -21,6 +21,9 @@ from airflow.utils.decorators import apply_defaults
 from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
 from airflow.contrib.kubernetes.pod import Resources
 from airflow.utils.state import State
+from airflow.contrib.kubernetes.volume_mount import VolumeMount  # noqa
+from airflow.contrib.kubernetes.volume import Volume  # noqa
+from airflow.contrib.kubernetes.secret import Secret  # noqa
 
 template_fields = ('templates_dict',)
 template_ext = tuple()
@@ -37,10 +40,14 @@ class KubernetesPodOperator(BaseOperator):
     :type: namespace: str
     :param cmds: entrypoint of the container.
         The docker images's entrypoint is used if this is not provide.
-    :type cmds: list
+    :type cmds: list of str
     :param arguments: arguments of to the entrypoint.
         The docker image's CMD is used if this is not provided.
-    :type arguments: list
+    :type arguments: list of str
+    :param volume_mounts: volumeMounts for launched pod
+    :type volume_mounts: list of VolumeMount
+    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
+    :type volumes: list of Volume
     :param labels: labels to apply to the Pod
     :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod
@@ -52,7 +59,7 @@ class KubernetesPodOperator(BaseOperator):
     :type env_vars: dict
     :param secrets: Kubernetes secrets to inject in the container,
         They can be exposed as environment vars or files in a volume.
-    :type secrets: list
+    :type secrets: list of Secret
     :param in_cluster: run kubernetes client with in_cluster configuration
     :type in_cluster: bool
     :param get_logs: get the stdout of the container as logs of the tasks
@@ -65,13 +72,18 @@ class KubernetesPodOperator(BaseOperator):
             client = kube_client.get_kube_client(in_cluster=self.in_cluster)
             gen = pod_generator.PodGenerator()
 
+            for mount in self.volume_mounts:
+                gen.add_mount(mount)
+            for volume in self.volumes:
+                gen.add_volume(volume)
+
             pod = gen.make_pod(
                 namespace=self.namespace,
                 image=self.image,
                 pod_id=self.name,
                 cmds=self.cmds,
                 arguments=self.arguments,
-                labels=self.labels
+                labels=self.labels,
             )
 
             pod.secrets = self.secrets
@@ -99,6 +111,8 @@ class KubernetesPodOperator(BaseOperator):
                  name,
                  cmds=None,
                  arguments=None,
+                 volume_mounts=None,
+                 volumes=None,
                  env_vars=None,
                  secrets=None,
                  in_cluster=False,
@@ -119,6 +133,8 @@ class KubernetesPodOperator(BaseOperator):
         self.startup_timeout_seconds = startup_timeout_seconds
         self.name = name
         self.env_vars = env_vars or {}
+        self.volume_mounts = volume_mounts or []
+        self.volumes = volumes or []
         self.secrets = secrets or []
         self.in_cluster = in_cluster
         self.get_logs = get_logs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 4b83fc0..cb0ad87 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -19,13 +19,26 @@ Kubernetes Operator
 
     secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
     secret_env  = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
-
+    volume_mount = VolumeMount('test-volume',
+                                mount_path='/root/mount_file',
+                                sub_path=None,
+                                read_only=True)
+
+    volume_config= {
+        'persistentVolumeClaim':
+          {
+            'claimName': 'test-volume'
+          }
+        }
+    volume = Volume(name='test-volume', configs=volume_config)
     k = KubernetesPodOperator(namespace='default',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "10"],
                               labels={"foo": "bar"},
                               secrets=[secret_file,secret_env]
+                              volume=[volume],
+                              volume_mounts=[volume_mount]
                               name="test",
                               task_id="task"
                               )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index 6d2c62d..498c47b 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -46,7 +46,7 @@ RUN pip install -U setuptools && \
 COPY airflow.tar.gz /tmp/airflow.tar.gz
 RUN pip install /tmp/airflow.tar.gz
 
-COPY airflow-init.sh /tmp/airflow-init.sh
+COPY airflow-test-env-init.sh /tmp/airflow-test-env-init.sh
 
 COPY bootstrap.sh /bootstrap.sh
 RUN chmod +x /bootstrap.sh

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
deleted file mode 100755
index cbd1c98..0000000
--- a/scripts/ci/kubernetes/docker/airflow-init.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-#
-#  Licensed to the Apache Software Foundation (ASF) under one   *
-#  or more contributor license agreements.  See the NOTICE file *
-#  distributed with this work for additional information        *
-#  regarding copyright ownership.  The ASF licenses this file   *
-#  to you under the Apache License, Version 2.0 (the            *
-#  "License"); you may not use this file except in compliance   *
-#  with the License.  You may obtain a copy of the License at   *
-#                                                               *
-#    http://www.apache.org/licenses/LICENSE-2.0                 *
-#                                                               *
-#  Unless required by applicable law or agreed to in writing,   *
-#  software distributed under the License is distributed on an  *
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
-#  KIND, either express or implied.  See the License for the    *
-#  specific language governing permissions and limitations      *
-#  under the License.
-
-cd /usr/local/lib/python2.7/dist-packages/airflow && \
-cp -R example_dags/* /root/airflow/dags/ && \
-airflow initdb && \
-alembic upgrade heads && \
-(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/docker/airflow-test-env-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-test-env-init.sh b/scripts/ci/kubernetes/docker/airflow-test-env-init.sh
new file mode 100755
index 0000000..aa86da7
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/airflow-test-env-init.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+#
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.
+
+cd /usr/local/lib/python2.7/dist-packages/airflow && \
+cp -R example_dags/* /root/airflow/dags/ && \
+airflow initdb && \
+alembic upgrade heads && \
+(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true) && \
+echo "retrieved from mount" > /root/test_volume/test.txt

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/kube/airflow.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml
index 09bbcd8..4f451ba 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml
+++ b/scripts/ci/kubernetes/kube/airflow.yaml
@@ -51,6 +51,8 @@ spec:
           subPath: airflow.cfg
         - name: airflow-dags
           mountPath: /root/airflow/dags
+        - name: test-volume
+          mountPath: /root/test_volume
         env:
         - name: SQL_ALCHEMY_CONN
           valueFrom:
@@ -61,7 +63,7 @@ spec:
           - "bash"
         args:
           - "-cx"
-          - "./tmp/airflow-init.sh"
+          - "./tmp/airflow-test-env-init.sh"
       containers:
       - name: webserver
         image: airflow
@@ -128,6 +130,9 @@ spec:
       - name: airflow-dags
         persistentVolumeClaim:
           claimName: airflow-dags
+      - name: test-volume
+        persistentVolumeClaim:
+          claimName: test-volume
       - name: airflow-logs
         persistentVolumeClaim:
           claimName: airflow-logs

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/kube/volumes.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml
index 58ad368..b5488e7 100644
--- a/scripts/ci/kubernetes/kube/volumes.yaml
+++ b/scripts/ci/kubernetes/kube/volumes.yaml
@@ -62,4 +62,26 @@ spec:
   resources:
     requests:
       storage: 2Gi
-
+---
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+  name: test-volume
+spec:
+  accessModes:
+    - ReadWriteOnce
+  capacity:
+    storage: 2Gi
+  hostPath:
+    path: /airflow-dags/
+---
+kind: PersistentVolumeClaim
+apiVersion: v1
+metadata:
+  name: test-volume
+spec:
+  accessModes:
+    - ReadWriteMany
+  resources:
+    requests:
+      storage: 2Gi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index 081fc04..8d888a3 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -21,6 +21,8 @@ from airflow import AirflowException
 from subprocess import check_call
 import mock
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.kubernetes.volume import Volume
 
 try:
     check_call(["kubectl", "get", "pods"])
@@ -37,7 +39,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
-            arguments=["echo", "10"],
+            arguments=["echo 10"],
             labels={"foo": "bar"},
             name="test",
             task_id="task"
@@ -50,14 +52,42 @@ class KubernetesPodOperatorTest(unittest.TestCase):
                 namespace='default',
                 image="ubuntu:16.04",
                 cmds=["bash", "-cx"],
-                arguments=["echo", "10"],
+                arguments=["echo 10"],
                 labels={"foo": "bar"},
                 name="test",
                 task_id="task",
                 get_logs=True
             )
             k.execute(None)
-            mock_logger.info.assert_any_call(b"+ echo\n")
+            mock_logger.info.assert_any_call(b"+ echo 10\n")
+
+    def test_volume_mount(self):
+        with mock.patch.object(PodLauncher, 'log') as mock_logger:
+            volume_mount = VolumeMount('test-volume',
+                                       mount_path='/root/mount_file',
+                                       sub_path=None,
+                                       read_only=True)
+
+            volume_config = {
+                'persistentVolumeClaim':
+                    {
+                        'claimName': 'test-volume'
+                    }
+            }
+            volume = Volume(name='test-volume', configs=volume_config)
+            k = KubernetesPodOperator(
+                namespace='default',
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["cat /root/mount_file/test.txt"],
+                labels={"foo": "bar"},
+                volume_mounts=[volume_mount],
+                volumes=[volume],
+                name="test",
+                task_id="task"
+            )
+            k.execute(None)
+            mock_logger.info.assert_any_call(b"retrieved from mount\n")
 
     def test_faulty_image(self):
         bad_image_name = "foobar"
@@ -65,7 +95,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
             namespace='default',
             image=bad_image_name,
             cmds=["bash", "-cx"],
-            arguments=["echo", "10"],
+            arguments=["echo 10"],
             labels={"foo": "bar"},
             name="test",
             task_id="task",
@@ -85,7 +115,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
-            arguments=[bad_internal_command, "10"],
+            arguments=[bad_internal_command + " 10"],
             labels={"foo": "bar"},
             name="test",
             task_id="task"