You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/11 22:35:13 UTC
[airflow] 32/32: Makes multi-namespace mode optional (#9570)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Mon Aug 10 13:41:40 2020 -0700
Makes multi-namespace mode optional (#9570)
Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments
Co-authored-by: Daniel Imberman <da...@astronomer.io>
(cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
airflow/config_templates/config.yml | 7 ++++++
airflow/config_templates/default_airflow.cfg | 4 ++++
airflow/executors/kubernetes_executor.py | 32 +++++++++++++++++++++++-----
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
type: string
example: ~
default: "default"
+ - name: multi_namespace_mode
+ description: |
+ Allows users to launch pods in multiple namespaces.
+ Will require creating a cluster-role for the scheduler
+ type: boolean
+ example: ~
+ default: "False"
- name: airflow_configmap
description: |
The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
# The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
namespace = default
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
# The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
# Example: airflow_configmap = airflow-configmap
airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
:ref:`executor:KubernetesExecutor`
"""
import base64
+import functools
import json
import multiprocessing
import time
@@ -162,6 +163,7 @@ class KubeConfig:
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+ self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""
- def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):
+
+ def __init__(self,
+ namespace,
+ mult_namespace_mode,
+ watcher_queue,
+ resource_version,
+ worker_uuid,
+ kube_config):
multiprocessing.Process.__init__(self)
self.namespace = namespace
+ self.multi_namespace_mode = mult_namespace_mode
self.worker_uuid = worker_uuid
self.watcher_queue = watcher_queue
self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
kwargs[key] = value
last_resource_version = None
- for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
- **kwargs):
+ if self.multi_namespace_mode:
+ list_worker_pods = functools.partial(watcher.stream,
+ kube_client.list_pod_for_all_namespaces,
+ **kwargs)
+ else:
+ list_worker_pods = functools.partial(watcher.stream,
+ kube_client.list_namespaced_pod,
+ self.namespace,
+ **kwargs)
+ for event in list_worker_pods():
task = event['object']
self.log.info(
'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
def _make_kube_watcher(self):
resource_version = KubeResourceVersion.get_current_resource_version()
- watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue,
- resource_version, self.worker_uuid, self.kube_config)
+ watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue,
+ namespace=self.kube_config.kube_namespace,
+ mult_namespace_mode=self.kube_config.multi_namespace_mode,
+ resource_version=resource_version,
+ worker_uuid=self.worker_uuid,
+ kube_config=self.kube_config)
watcher.start()
return watcher