You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/14 19:58:59 UTC
incubator-airflow git commit: [AIRFLOW-2460] Users can now use volume
mounts and volumes when launching pods using k8s operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 6c19468e0 -> 0b6a7000c
[AIRFLOW-2460] Users can now use volume mounts and volumes when
launching pods using k8s operator
Closes #3356 from dimberman/k8s-mounts
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0b6a7000
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0b6a7000
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0b6a7000
Branch: refs/heads/master
Commit: 0b6a7000c063d8bf744a1b907f4e1d7c04f57e57
Parents: 6c19468
Author: Daniel Imberman <da...@gmail.com>
Authored: Mon May 14 21:58:39 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon May 14 21:58:39 2018 +0200
----------------------------------------------------------------------
airflow/contrib/kubernetes/pod_generator.py | 43 ++++++++++++++++----
airflow/contrib/kubernetes/volume.py | 33 +++++++++++++++
airflow/contrib/kubernetes/volume_mount.py | 37 +++++++++++++++++
.../operators/kubernetes_pod_operator.py | 24 +++++++++--
docs/kubernetes.rst | 15 ++++++-
scripts/ci/kubernetes/docker/Dockerfile | 2 +-
scripts/ci/kubernetes/docker/airflow-init.sh | 24 -----------
.../kubernetes/docker/airflow-test-env-init.sh | 25 ++++++++++++
scripts/ci/kubernetes/kube/airflow.yaml | 7 +++-
scripts/ci/kubernetes/kube/volumes.yaml | 24 ++++++++++-
.../minikube/test_kubernetes_pod_operator.py | 40 +++++++++++++++---
11 files changed, 230 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py
index 78d3926..0f9dabd 100644
--- a/airflow/contrib/kubernetes/pod_generator.py
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -19,6 +19,8 @@ import os
from airflow.contrib.kubernetes.pod import Pod
import uuid
+from airflow.contrib.kubernetes.volume_mount import VolumeMount # noqa
+from airflow.contrib.kubernetes.volume import Volume # noqa
class PodGenerator:
@@ -64,16 +66,30 @@ class PodGenerator:
def _get_init_containers(self):
return self.init_containers
- def add_volume(self, name):
+ def add_volume(self, volume):
+ """
+ Args:
+ volume (Volume):
+ """
+
+ self._add_volume(name=volume.name, configs=volume.configs)
+
+ def _add_volume(self, name, configs):
"""
Args:
name (str):
+ configs (dict): Configurations for the volume.
+ Could be used to define PersistentVolumeClaim, ConfigMap, etc...
Returns:
"""
- self.volumes.append({'name': name})
+ volume_map = {'name': name}
+ for k, v in configs.items():
+ volume_map[k] = v
+
+ self.volumes.append(volume_map)
def add_volume_with_configmap(self, name, config_map):
self.volumes.append(
@@ -83,11 +99,11 @@ class PodGenerator:
}
)
- def add_mount(self,
- name,
- mount_path,
- sub_path,
- read_only):
+ def _add_mount(self,
+ name,
+ mount_path,
+ sub_path,
+ read_only):
"""
Args:
@@ -107,6 +123,19 @@ class PodGenerator:
'readOnly': read_only
})
+ def add_mount(self,
+ volume_mount):
+ """
+ Args:
+ volume_mount (VolumeMount):
+ """
+ self._add_mount(
+ name=volume_mount.name,
+ mount_path=volume_mount.mount_path,
+ sub_path=volume_mount.sub_path,
+ read_only=volume_mount.read_only
+ )
+
def _get_volumes_and_mounts(self):
return self.volumes, self.volume_mounts
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/kubernetes/volume.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/volume.py b/airflow/contrib/kubernetes/volume.py
new file mode 100644
index 0000000..d5b4f60
--- /dev/null
+++ b/airflow/contrib/kubernetes/volume.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+class Volume:
+ """Defines Kubernetes Volume"""
+
+ def __init__(self, name, configs):
+ """ Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps
+ and Persistent Volumes
+ :param name: the name of the volume mount
+ :type: name: str
+ :param configs: dictionary of any features needed for volume.
+ We purposely keep this vague since there are multiple volume types with changing
+ configs.
+ :type: configs: dict
+ """
+ self.name = name
+ self.configs = configs
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/kubernetes/volume_mount.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/volume_mount.py b/airflow/contrib/kubernetes/volume_mount.py
new file mode 100644
index 0000000..4bdf09c
--- /dev/null
+++ b/airflow/contrib/kubernetes/volume_mount.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+class VolumeMount:
+ """Defines Kubernetes Volume Mount"""
+
+ def __init__(self, name, mount_path, sub_path, read_only):
+ """Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to
+ running container.
+ :param name: the name of the volume mount
+ :type name: str
+ :param mount_path:
+ :type mount_path: str
+ :param sub_path: subpath within the volume mount
+ :type sub_path: str
+ :param read_only: whether to access pod with read-only mode
+ :type read_only: bool
+ """
+ self.name = name
+ self.mount_path = mount_path
+ self.sub_path = sub_path
+ self.read_only = read_only
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 0f9c943..ffdc2cf 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -21,6 +21,9 @@ from airflow.utils.decorators import apply_defaults
from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.contrib.kubernetes.pod import Resources
from airflow.utils.state import State
+from airflow.contrib.kubernetes.volume_mount import VolumeMount # noqa
+from airflow.contrib.kubernetes.volume import Volume # noqa
+from airflow.contrib.kubernetes.secret import Secret # noqa
template_fields = ('templates_dict',)
template_ext = tuple()
@@ -37,10 +40,14 @@ class KubernetesPodOperator(BaseOperator):
:type: namespace: str
:param cmds: entrypoint of the container.
The docker images's entrypoint is used if this is not provide.
- :type cmds: list
+ :type cmds: list of str
:param arguments: arguments of to the entrypoint.
The docker image's CMD is used if this is not provided.
- :type arguments: list
+ :type arguments: list of str
+ :param volume_mounts: volumeMounts for launched pod
+ :type volume_mounts: list of VolumeMount
+ :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
+ :type volumes: list of Volume
:param labels: labels to apply to the Pod
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod
@@ -52,7 +59,7 @@ class KubernetesPodOperator(BaseOperator):
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container,
They can be exposed as environment vars or files in a volume.
- :type secrets: list
+ :type secrets: list of Secret
:param in_cluster: run kubernetes client with in_cluster configuration
:type in_cluster: bool
:param get_logs: get the stdout of the container as logs of the tasks
@@ -65,13 +72,18 @@ class KubernetesPodOperator(BaseOperator):
client = kube_client.get_kube_client(in_cluster=self.in_cluster)
gen = pod_generator.PodGenerator()
+ for mount in self.volume_mounts:
+ gen.add_mount(mount)
+ for volume in self.volumes:
+ gen.add_volume(volume)
+
pod = gen.make_pod(
namespace=self.namespace,
image=self.image,
pod_id=self.name,
cmds=self.cmds,
arguments=self.arguments,
- labels=self.labels
+ labels=self.labels,
)
pod.secrets = self.secrets
@@ -99,6 +111,8 @@ class KubernetesPodOperator(BaseOperator):
name,
cmds=None,
arguments=None,
+ volume_mounts=None,
+ volumes=None,
env_vars=None,
secrets=None,
in_cluster=False,
@@ -119,6 +133,8 @@ class KubernetesPodOperator(BaseOperator):
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
self.env_vars = env_vars or {}
+ self.volume_mounts = volume_mounts or []
+ self.volumes = volumes or []
self.secrets = secrets or []
self.in_cluster = in_cluster
self.get_logs = get_logs
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 4b83fc0..cb0ad87 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -19,13 +19,26 @@ Kubernetes Operator
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
-
+ volume_mount = VolumeMount('test-volume',
+ mount_path='/root/mount_file',
+ sub_path=None,
+ read_only=True)
+
+ volume_config= {
+ 'persistentVolumeClaim':
+ {
+ 'claimName': 'test-volume'
+ }
+ }
+ volume = Volume(name='test-volume', configs=volume_config)
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file,secret_env]
+ volume=[volume],
+ volume_mounts=[volume_mount]
name="test",
task_id="task"
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index 6d2c62d..498c47b 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -46,7 +46,7 @@ RUN pip install -U setuptools && \
COPY airflow.tar.gz /tmp/airflow.tar.gz
RUN pip install /tmp/airflow.tar.gz
-COPY airflow-init.sh /tmp/airflow-init.sh
+COPY airflow-test-env-init.sh /tmp/airflow-test-env-init.sh
COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/docker/airflow-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh
deleted file mode 100755
index cbd1c98..0000000
--- a/scripts/ci/kubernetes/docker/airflow-init.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one *
-# or more contributor license agreements. See the NOTICE file *
-# distributed with this work for additional information *
-# regarding copyright ownership. The ASF licenses this file *
-# to you under the Apache License, Version 2.0 (the *
-# "License"); you may not use this file except in compliance *
-# with the License. You may obtain a copy of the License at *
-# *
-# http://www.apache.org/licenses/LICENSE-2.0 *
-# *
-# Unless required by applicable law or agreed to in writing, *
-# software distributed under the License is distributed on an *
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
-# KIND, either express or implied. See the License for the *
-# specific language governing permissions and limitations *
-# under the License.
-
-cd /usr/local/lib/python2.7/dist-packages/airflow && \
-cp -R example_dags/* /root/airflow/dags/ && \
-airflow initdb && \
-alembic upgrade heads && \
-(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/docker/airflow-test-env-init.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/airflow-test-env-init.sh b/scripts/ci/kubernetes/docker/airflow-test-env-init.sh
new file mode 100755
index 0000000..aa86da7
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/airflow-test-env-init.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one *
+# or more contributor license agreements. See the NOTICE file *
+# distributed with this work for additional information *
+# regarding copyright ownership. The ASF licenses this file *
+# to you under the Apache License, Version 2.0 (the *
+# "License"); you may not use this file except in compliance *
+# with the License. You may obtain a copy of the License at *
+# *
+# http://www.apache.org/licenses/LICENSE-2.0 *
+# *
+# Unless required by applicable law or agreed to in writing, *
+# software distributed under the License is distributed on an *
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+# KIND, either express or implied. See the License for the *
+# specific language governing permissions and limitations *
+# under the License.
+
+cd /usr/local/lib/python2.7/dist-packages/airflow && \
+cp -R example_dags/* /root/airflow/dags/ && \
+airflow initdb && \
+alembic upgrade heads && \
+(airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true) && \
+echo "retrieved from mount" > /root/test_volume/test.txt
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/kube/airflow.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml
index 09bbcd8..4f451ba 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml
+++ b/scripts/ci/kubernetes/kube/airflow.yaml
@@ -51,6 +51,8 @@ spec:
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
+ - name: test-volume
+ mountPath: /root/test_volume
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
@@ -61,7 +63,7 @@ spec:
- "bash"
args:
- "-cx"
- - "./tmp/airflow-init.sh"
+ - "./tmp/airflow-test-env-init.sh"
containers:
- name: webserver
image: airflow
@@ -128,6 +130,9 @@ spec:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
+ - name: test-volume
+ persistentVolumeClaim:
+ claimName: test-volume
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/scripts/ci/kubernetes/kube/volumes.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml
index 58ad368..b5488e7 100644
--- a/scripts/ci/kubernetes/kube/volumes.yaml
+++ b/scripts/ci/kubernetes/kube/volumes.yaml
@@ -62,4 +62,26 @@ spec:
resources:
requests:
storage: 2Gi
-
+---
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+ name: test-volume
+spec:
+ accessModes:
+ - ReadWriteOnce
+ capacity:
+ storage: 2Gi
+ hostPath:
+ path: /airflow-dags/
+---
+kind: PersistentVolumeClaim
+apiVersion: v1
+metadata:
+ name: test-volume
+spec:
+ accessModes:
+ - ReadWriteMany
+ resources:
+ requests:
+ storage: 2Gi
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0b6a7000/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index 081fc04..8d888a3 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -21,6 +21,8 @@ from airflow import AirflowException
from subprocess import check_call
import mock
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+from airflow.contrib.kubernetes.volume_mount import VolumeMount
+from airflow.contrib.kubernetes.volume import Volume
try:
check_call(["kubectl", "get", "pods"])
@@ -37,7 +39,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
- arguments=["echo", "10"],
+ arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task"
@@ -50,14 +52,42 @@ class KubernetesPodOperatorTest(unittest.TestCase):
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
- arguments=["echo", "10"],
+ arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
get_logs=True
)
k.execute(None)
- mock_logger.info.assert_any_call(b"+ echo\n")
+ mock_logger.info.assert_any_call(b"+ echo 10\n")
+
+ def test_volume_mount(self):
+ with mock.patch.object(PodLauncher, 'log') as mock_logger:
+ volume_mount = VolumeMount('test-volume',
+ mount_path='/root/mount_file',
+ sub_path=None,
+ read_only=True)
+
+ volume_config = {
+ 'persistentVolumeClaim':
+ {
+ 'claimName': 'test-volume'
+ }
+ }
+ volume = Volume(name='test-volume', configs=volume_config)
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["cat /root/mount_file/test.txt"],
+ labels={"foo": "bar"},
+ volume_mounts=[volume_mount],
+ volumes=[volume],
+ name="test",
+ task_id="task"
+ )
+ k.execute(None)
+ mock_logger.info.assert_any_call(b"retrieved from mount\n")
def test_faulty_image(self):
bad_image_name = "foobar"
@@ -65,7 +95,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
namespace='default',
image=bad_image_name,
cmds=["bash", "-cx"],
- arguments=["echo", "10"],
+ arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
@@ -85,7 +115,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
- arguments=[bad_internal_command, "10"],
+ arguments=[bad_internal_command + " 10"],
labels={"foo": "bar"},
name="test",
task_id="task"