You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/08/11 15:47:41 UTC

[airflow] branch v1-10-test updated (14ca77d -> a281faa)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 14ca77d  Fix KubernetesPodOperator reattachment (#10230)
     new 71c6ace  Makes multi-namespace mode optional (#9570)
     new a281faa  fix static tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/config_templates/config.yml                |  7 +++++
 airflow/config_templates/default_airflow.cfg       |  4 +++
 .../contrib/operators/kubernetes_pod_operator.py   |  2 --
 airflow/executors/kubernetes_executor.py           | 32 ++++++++++++++++++----
 4 files changed, 38 insertions(+), 7 deletions(-)


[airflow] 02/02: fix static tests

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a281faa00627ebdbde3c947e2857ae3e60b9273e
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Tue Aug 11 08:47:00 2020 -0700

    fix static tests
---
 airflow/contrib/operators/kubernetes_pod_operator.py | 2 --
 1 file changed, 2 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 12fc59b..98464b7 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -316,8 +316,6 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
             final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
         return final_state, result
 
-
-
     @staticmethod
     def _get_pod_identifying_label_string(labels):
         filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}


[airflow] 01/02: Makes multi-namespace mode optional (#9570)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 71c6ace947e5c048f4e8d227664989932b8f57f7
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

    Makes multi-namespace mode optional (#9570)
    
    Running the airflow k8sexecutor with multiple namespace abilities
    requires creating a ClusterRole which can break existing deployments
    
    Co-authored-by: Daniel Imberman <da...@astronomer.io>
    (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml          |  7 ++++++
 airflow/config_templates/default_airflow.cfg |  4 ++++
 airflow/executors/kubernetes_executor.py     | 32 +++++++++++++++++++++++-----
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
       type: string
       example: ~
       default: "default"
+    - name: multi_namespace_mode
+      description: |
+        Allows users to launch pods in multiple namespaces.
+        Will require creating a cluster-role for the scheduler
+      type: boolean
+      example: ~
+      default: "False"
     - name: airflow_configmap
       description: |
         The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
     :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
         # cluster has RBAC enabled, your scheduler may need service account permissions to
         # create, watch, get, and delete pods in this namespace.
         self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+        self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode')
         # The Kubernetes Namespace in which pods will be created by the executor. Note
         # that if your
         # cluster has RBAC enabled, your workers may need service account permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
     """Watches for Kubernetes jobs"""
-    def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):
+
+    def __init__(self,
+                 namespace,
+                 mult_namespace_mode,
+                 watcher_queue,
+                 resource_version,
+                 worker_uuid,
+                 kube_config):
         multiprocessing.Process.__init__(self)
         self.namespace = namespace
+        self.multi_namespace_mode = mult_namespace_mode
         self.worker_uuid = worker_uuid
         self.watcher_queue = watcher_queue
         self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
                 kwargs[key] = value
 
         last_resource_version = None
-        for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
-                                    **kwargs):
+        if self.multi_namespace_mode:
+            list_worker_pods = functools.partial(watcher.stream,
+                                                 kube_client.list_pod_for_all_namespaces,
+                                                 **kwargs)
+        else:
+            list_worker_pods = functools.partial(watcher.stream,
+                                                 kube_client.list_namespaced_pod,
+                                                 self.namespace,
+                                                 **kwargs)
+        for event in list_worker_pods():
             task = event['object']
             self.log.info(
                 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
     def _make_kube_watcher(self):
         resource_version = KubeResourceVersion.get_current_resource_version()
-        watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue,
-                                       resource_version, self.worker_uuid, self.kube_config)
+        watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue,
+                                       namespace=self.kube_config.kube_namespace,
+                                       mult_namespace_mode=self.kube_config.multi_namespace_mode,
+                                       resource_version=resource_version,
+                                       worker_uuid=self.worker_uuid,
+                                       kube_config=self.kube_config)
         watcher.start()
         return watcher