You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/04 19:52:28 UTC
[airflow] branch v1-10-test updated: [AIRFLOW-5873]
KubernetesPodOperator fixes and test (#6524)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 76591ac [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
76591ac is described below
commit 76591ac1205170b6d09169ff87f4e6b39f676c1e
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Thu Jun 4 12:45:56 2020 -0700
[AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
- `security_context` was missing from docs of `KubernetesPodOperator`
- `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to
False in comparison to `default_args.py`, also default `do_xcom_push`
was overwritten to False in contradiction to `BaseOperator`
- `KubernetesPodOperator` kwarg `resources` is erroneously passed to
`base_operator`, instead should only go to `PodGenerator`. The two
have different syntax. (both on `master` and `v1-10-test` branches)
- `kubernetes/pod.py`: classes do not have `__slots__`
so they would accept arbitrary values in `setattr`
- Reduce amount of times the pod object is copied before execution
(cherry picked from commit cf38ddc0571634588883699f481c219a0bb2fbcd)
---
airflow/executors/kubernetes_executor.py | 2 +-
airflow/kubernetes/worker_configuration.py | 2 +-
tests/executors/test_kubernetes_executor.py | 35 +++++++++++++++++++++++++++
tests/kubernetes/test_worker_configuration.py | 15 ++++++++++++
4 files changed, 52 insertions(+), 2 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 6944274..b4e923c 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -216,7 +216,7 @@ class KubeConfig:
def _get_security_context_val(self, scontext):
val = conf.get(self.kubernetes_section, scontext)
if not val:
- return 0
+ return ""
else:
return int(val)
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 41a65b3..b2f69f7 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -164,7 +164,7 @@ class WorkerConfiguration(LoggingMixin):
if self.kube_config.git_sync_run_as_user != "":
init_containers.security_context = k8s.V1SecurityContext(
- run_as_user=self.kube_config.git_sync_run_as_user or 65533
+ run_as_user=self.kube_config.git_sync_run_as_user
) # git-sync user
return [init_containers]
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9073493..b809394 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -26,6 +26,8 @@ from urllib3 import HTTPResponse
from airflow.utils import timezone
from tests.compat import mock
+from tests.test_utils.config import conf_vars
+
try:
from kubernetes.client.rest import ApiException
@@ -33,6 +35,7 @@ try:
from airflow.configuration import conf # noqa: F401
from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
from airflow.executors.kubernetes_executor import KubernetesExecutor
+ from airflow.executors.kubernetes_executor import KubeConfig
from airflow.utils.state import State
except ImportError:
AirflowKubernetesScheduler = None # type: ignore
@@ -120,6 +123,38 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
self.assertEqual(datetime_obj, new_datetime_obj)
+class TestKubeConfig(unittest.TestCase):
+ def setUp(self):
+ if AirflowKubernetesScheduler is None:
+ self.skipTest("kubernetes python package is not installed")
+
+ @conf_vars({
+ ('kubernetes', 'git_repo'): 'foo',
+ ('kubernetes', 'git_branch'): 'foo',
+ ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+ ('kubernetes', 'git_sync_run_as_user'): '0',
+ })
+ def test_kube_config_git_sync_run_as_user_root(self):
+ self.assertEqual(KubeConfig().git_sync_run_as_user, 0)
+
+ @conf_vars({
+ ('kubernetes', 'git_repo'): 'foo',
+ ('kubernetes', 'git_branch'): 'foo',
+ ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+ })
+ def test_kube_config_git_sync_run_as_user_not_present(self):
+ self.assertEqual(KubeConfig().git_sync_run_as_user, 65533)
+
+ @conf_vars({
+ ('kubernetes', 'git_repo'): 'foo',
+ ('kubernetes', 'git_branch'): 'foo',
+ ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+ ('kubernetes', 'git_sync_run_as_user'): '',
+ })
+ def test_kube_config_git_sync_run_as_user_empty_string(self):
+ self.assertEqual(KubeConfig().git_sync_run_as_user, '')
+
+
class TestKubernetesExecutor(unittest.TestCase):
"""
Tests if an ApiException from the Kube Client will cause the task to
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 1b15d98..6769f7a 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -297,6 +297,21 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.assertIsNone(init_containers[0].security_context)
+ def test_init_environment_using_git_sync_run_as_user_root(self):
+ # Tests if git_syn_run_as_user is '0', securityContext is created with
+ # the right uid
+
+ self.kube_config.dags_volume_claim = None
+ self.kube_config.dags_volume_host = None
+ self.kube_config.dags_in_image = None
+ self.kube_config.git_sync_run_as_user = 0
+
+ worker_config = WorkerConfiguration(self.kube_config)
+ init_containers = worker_config._get_init_containers()
+ self.assertTrue(init_containers) # check not empty
+
+ self.assertEqual(0, init_containers[0].security_context.run_as_user)
+
def test_make_pod_run_as_user_0(self):
# Tests the pod created with run-as-user 0 actually gets that in it's config
self.kube_config.worker_run_as_user = 0