You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:40 UTC

[airflow] 05/47: Makes multi-namespace mode optional (#9570)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 66d0210f4635e742e7bb4c89b128c60ea351f0d5
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

    Makes multi-namespace mode optional (#9570)
    
    Running the airflow k8sexecutor with multiple namespace abilities
    requires creating a ClusterRole which can break existing deployments
    
    Co-authored-by: Daniel Imberman <da...@astronomer.io>
    (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml          |  7 ++++++
 airflow/config_templates/default_airflow.cfg |  4 ++++
 airflow/executors/kubernetes_executor.py     | 32 +++++++++++++++++++++++-----
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
       type: string
       example: ~
       default: "default"
+    - name: multi_namespace_mode
+      description: |
+        Allows users to launch pods in multiple namespaces.
+        Will require creating a cluster-role for the scheduler
+      type: boolean
+      example: ~
+      default: "False"
     - name: airflow_configmap
       description: |
         The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
     :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
         # cluster has RBAC enabled, your scheduler may need service account permissions to
         # create, watch, get, and delete pods in this namespace.
         self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+        self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode')
         # The Kubernetes Namespace in which pods will be created by the executor. Note
         # that if your
         # cluster has RBAC enabled, your workers may need service account permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
     """Watches for Kubernetes jobs"""
-    def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):
+
+    def __init__(self,
+                 namespace,
+                 mult_namespace_mode,
+                 watcher_queue,
+                 resource_version,
+                 worker_uuid,
+                 kube_config):
         multiprocessing.Process.__init__(self)
         self.namespace = namespace
+        self.multi_namespace_mode = mult_namespace_mode
         self.worker_uuid = worker_uuid
         self.watcher_queue = watcher_queue
         self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
                 kwargs[key] = value
 
         last_resource_version = None
-        for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
-                                    **kwargs):
+        if self.multi_namespace_mode:
+            list_worker_pods = functools.partial(watcher.stream,
+                                                 kube_client.list_pod_for_all_namespaces,
+                                                 **kwargs)
+        else:
+            list_worker_pods = functools.partial(watcher.stream,
+                                                 kube_client.list_namespaced_pod,
+                                                 self.namespace,
+                                                 **kwargs)
+        for event in list_worker_pods():
             task = event['object']
             self.log.info(
                 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
     def _make_kube_watcher(self):
         resource_version = KubeResourceVersion.get_current_resource_version()
-        watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue,
-                                       resource_version, self.worker_uuid, self.kube_config)
+        watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue,
+                                       namespace=self.kube_config.kube_namespace,
+                                       mult_namespace_mode=self.kube_config.multi_namespace_mode,
+                                       resource_version=resource_version,
+                                       worker_uuid=self.worker_uuid,
+                                       kube_config=self.kube_config)
         watcher.start()
         return watcher