You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/04 19:52:28 UTC

[airflow] branch v1-10-test updated: [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 76591ac  [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
76591ac is described below

commit 76591ac1205170b6d09169ff87f4e6b39f676c1e
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Thu Jun 4 12:45:56 2020 -0700

    [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
    
    - `security_context` was missing from docs of `KubernetesPodOperator`
    - `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to
    False in comparison to `default_args.py`, also default `do_xcom_push`
    was overwritten to False in contradiction to `BaseOperator`
    - `KubernetesPodOperator` kwarg `resources` is erroneously passed to
    `base_operator`, instead should only go to `PodGenerator`. The two
    have different syntax. (both on `master` and `v1-10-test` branches)
    - `kubernetes/pod.py`: classes do not have `__slots__`
    so they would accept arbitrary values in `setattr`
    - Reduce amount of times the pod object is copied before execution
    
    (cherry picked from commit cf38ddc0571634588883699f481c219a0bb2fbcd)
---
 airflow/executors/kubernetes_executor.py      |  2 +-
 airflow/kubernetes/worker_configuration.py    |  2 +-
 tests/executors/test_kubernetes_executor.py   | 35 +++++++++++++++++++++++++++
 tests/kubernetes/test_worker_configuration.py | 15 ++++++++++++
 4 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 6944274..b4e923c 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -216,7 +216,7 @@ class KubeConfig:
     def _get_security_context_val(self, scontext):
         val = conf.get(self.kubernetes_section, scontext)
         if not val:
-            return 0
+            return ""
         else:
             return int(val)
 
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 41a65b3..b2f69f7 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -164,7 +164,7 @@ class WorkerConfiguration(LoggingMixin):
 
         if self.kube_config.git_sync_run_as_user != "":
             init_containers.security_context = k8s.V1SecurityContext(
-                run_as_user=self.kube_config.git_sync_run_as_user or 65533
+                run_as_user=self.kube_config.git_sync_run_as_user
             )  # git-sync user
 
         return [init_containers]
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9073493..b809394 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -26,6 +26,8 @@ from urllib3 import HTTPResponse
 
 from airflow.utils import timezone
 from tests.compat import mock
+from tests.test_utils.config import conf_vars
+
 
 try:
     from kubernetes.client.rest import ApiException
@@ -33,6 +35,7 @@ try:
     from airflow.configuration import conf  # noqa: F401
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubernetesExecutor
+    from airflow.executors.kubernetes_executor import KubeConfig
     from airflow.utils.state import State
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -120,6 +123,38 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         self.assertEqual(datetime_obj, new_datetime_obj)
 
 
+class TestKubeConfig(unittest.TestCase):
+    def setUp(self):
+        if AirflowKubernetesScheduler is None:
+            self.skipTest("kubernetes python package is not installed")
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+        ('kubernetes', 'git_sync_run_as_user'): '0',
+    })
+    def test_kube_config_git_sync_run_as_user_root(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, 0)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+    })
+    def test_kube_config_git_sync_run_as_user_not_present(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, 65533)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+        ('kubernetes', 'git_sync_run_as_user'): '',
+    })
+    def test_kube_config_git_sync_run_as_user_empty_string(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, '')
+
+
 class TestKubernetesExecutor(unittest.TestCase):
     """
     Tests if an ApiException from the Kube Client will cause the task to
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 1b15d98..6769f7a 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -297,6 +297,21 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         self.assertIsNone(init_containers[0].security_context)
 
+    def test_init_environment_using_git_sync_run_as_user_root(self):
+        # Tests if git_syn_run_as_user is '0', securityContext is created with
+        # the right uid
+
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.git_sync_run_as_user = 0
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+        self.assertTrue(init_containers)  # check not empty
+
+        self.assertEqual(0, init_containers[0].security_context.run_as_user)
+
     def test_make_pod_run_as_user_0(self):
         # Tests the pod created with run-as-user 0 actually gets that in it's config
         self.kube_config.worker_run_as_user = 0