You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/02 20:40:16 UTC

[GitHub] Fokko closed pull request #4247: [AIRFLOW-3402] Support global k8s affinity and toleration configs

Fokko closed pull request #4247: [AIRFLOW-3402] Support global k8s affinity and toleration configs
URL: https://github.com/apache/incubator-airflow/pull/4247
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 536f0061e4..960fe8ff5f 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -664,6 +664,16 @@ gcp_service_account_keys =
 # It will raise an exception if called from a process not running in a kubernetes environment.
 in_cluster = True
 
+# Affinity configuration as a single line formatted JSON object.
+# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.):
+#   https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core
+affinity =
+
+# A list of toleration objects as a single line formatted JSON array
+# See:
+#   https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core
+tolerations =
+
 [kubernetes_node_selectors]
 # The Key-value pairs to be given to worker pods.
 # The worker pods will be scheduled to the nodes of the specified key-value pairs.
diff --git a/airflow/contrib/example_dags/example_kubernetes_executor.py b/airflow/contrib/example_dags/example_kubernetes_executor.py
index 1d9bb73043..d03e255ab3 100644
--- a/airflow/contrib/example_dags/example_kubernetes_executor.py
+++ b/airflow/contrib/example_dags/example_kubernetes_executor.py
@@ -32,6 +32,31 @@
     schedule_interval=None
 )
 
+affinity = {
+    'podAntiAffinity': {
+        'requiredDuringSchedulingIgnoredDuringExecution': [
+            {
+                'topologyKey': 'kubernetes.io/hostname',
+                'labelSelector': {
+                    'matchExpressions': [
+                        {
+                            'key': 'app',
+                            'operator': 'In',
+                            'values': ['airflow']
+                        }
+                    ]
+                }
+            }
+        ]
+    }
+}
+
+tolerations = [{
+    'key': 'dedicated',
+    'operator': 'Equal',
+    'value': 'airflow'
+}]
+
 
 def print_stuff():
     print("stuff!")
@@ -59,11 +84,14 @@ def use_zip_binary():
     executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
 )
 
-# Limit resources on this operator/task
+# Limit resources on this operator/task with node affinity & tolerations
 three_task = PythonOperator(
     task_id="three_task", python_callable=print_stuff, dag=dag,
     executor_config={
-        "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}}
+        "KubernetesExecutor": {"request_memory": "128Mi",
+                               "limit_memory": "128Mi",
+                               "tolerations": tolerations,
+                               "affinity": affinity}}
 )
 
 start_task.set_downstream([one_task, two_task, three_task])
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 5cb27d7551..fa81cf3203 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import base64
+import json
 import multiprocessing
 from queue import Queue
 from dateutil import parser
@@ -40,7 +41,7 @@ class KubernetesExecutorConfig:
     def __init__(self, image=None, image_pull_policy=None, request_memory=None,
                  request_cpu=None, limit_memory=None, limit_cpu=None,
                  gcp_service_account_key=None, node_selectors=None, affinity=None,
-                 annotations=None, volumes=None, volume_mounts=None):
+                 annotations=None, volumes=None, volume_mounts=None, tolerations=None):
         self.image = image
         self.image_pull_policy = image_pull_policy
         self.request_memory = request_memory
@@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None,
         self.annotations = annotations
         self.volumes = volumes
         self.volume_mounts = volume_mounts
+        self.tolerations = tolerations
 
     def __repr__(self):
         return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
                "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
                "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
-               "volume_mounts={})" \
+               "volume_mounts={}, tolerations={})" \
             .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
                     self.request_memory, self.request_cpu, self.limit_memory,
                     self.limit_cpu, self.gcp_service_account_key, self.node_selectors,
-                    self.affinity, self.annotations, self.volumes, self.volume_mounts)
+                    self.affinity, self.annotations, self.volumes, self.volume_mounts,
+                    self.tolerations)
 
     @staticmethod
     def from_dict(obj):
@@ -88,6 +91,7 @@ def from_dict(obj):
             annotations=namespaced.get('annotations', {}),
             volumes=namespaced.get('volumes', []),
             volume_mounts=namespaced.get('volume_mounts', []),
+            tolerations=namespaced.get('tolerations', None),
         )
 
     def as_dict(self):
@@ -104,6 +108,7 @@ def as_dict(self):
             'annotations': self.annotations,
             'volumes': self.volumes,
             'volume_mounts': self.volume_mounts,
+            'tolerations': self.tolerations,
         }
 
 
@@ -217,6 +222,18 @@ def __init__(self):
         # configmap
         self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')
 
+        affinity_json = conf.get(self.kubernetes_section, 'affinity')
+        if affinity_json:
+            self.kube_affinity = json.loads(affinity_json)
+        else:
+            self.kube_affinity = None
+
+        tolerations_json = conf.get(self.kubernetes_section, 'tolerations')
+        if tolerations_json:
+            self.kube_tolerations = json.loads(tolerations_json)
+        else:
+            self.kube_tolerations = None
+
         self._validate()
 
     def _validate(self):
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index bad5caa738..6d29775925 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -60,6 +60,10 @@ class Pod:
     :type image_pull_secrets: str
     :param affinity: A dict containing a group of affinity scheduling rules
     :type affinity: dict
+    :param hostnetwork: If True enable host networking on the pod
+    :type hostnetwork: bool
+    :param tolerations: A list of kubernetes tolerations
+    :type tolerations: list
     """
     def __init__(
             self,
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index 9a6f21340d..7b9a942de6 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -222,13 +222,16 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
             limit_cpu=kube_executor_config.limit_cpu
         )
         gcp_sa_key = kube_executor_config.gcp_service_account_key
-        annotations = kube_executor_config.annotations.copy()
+        annotations = dict(kube_executor_config.annotations)
         if gcp_sa_key:
             annotations['iam.cloud.google.com/service-account'] = gcp_sa_key
 
         volumes = [value for value in volumes_dict.values()] + kube_executor_config.volumes
         volume_mounts = [value for value in volume_mounts_dict.values()] + kube_executor_config.volume_mounts
 
+        affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
+        tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations
+
         return Pod(
             namespace=namespace,
             name=pod_id,
@@ -253,5 +256,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
             annotations=annotations,
             node_selectors=(kube_executor_config.node_selectors or
                             self.kube_config.kube_node_selectors),
-            affinity=kube_executor_config.affinity
+            affinity=affinity,
+            tolerations=tolerations
         )
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f6c1f9d45e..3a5bef5381 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator):
         /airflow/xcom/return.json in the container will also be pushed to an
         XCom when the container completes.
     :type xcom_push: bool
-    :param tolerations: Kubernetes tolerations
-    :type list of tolerations
+    :param hostnetwork: If True enable host networking on the pod
+    :type hostnetwork: bool
+    :param tolerations: A list of kubernetes tolerations
+    :type tolerations: list tolerations
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
diff --git a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
index 0ca6d423e4..353fdd94bb 100644
--- a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
+++ b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
@@ -199,6 +199,10 @@ data:
     namespace = default
     gcp_service_account_keys =
 
+    # Example affinity and toleration definitions.
+    affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
+    tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
+
     # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
     git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
     git_sync_container_tag = v2.0.5
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py
index 4836693002..76cd9fb2c8 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -14,6 +14,8 @@
 #
 
 import unittest
+import uuid
+
 import mock
 import re
 import string
@@ -25,6 +27,7 @@
     from kubernetes.client.rest import ApiException
     from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
+    from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
     from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
 except ImportError:
     AirflowKubernetesScheduler = None
@@ -85,13 +88,41 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
     options are passed to worker pod config
     """
 
+    affinity_config = {
+        'podAntiAffinity': {
+            'requiredDuringSchedulingIgnoredDuringExecution': [
+                {
+                    'topologyKey': 'kubernetes.io/hostname',
+                    'labelSelector': {
+                        'matchExpressions': [
+                            {
+                                'key': 'app',
+                                'operator': 'In',
+                                'values': ['airflow']
+                            }
+                        ]
+                    }
+                }
+            ]
+        }
+    }
+
+    tolerations_config = [
+        {
+            'key': 'dedicated',
+            'operator': 'Equal',
+            'value': 'airflow'
+        },
+        {
+            'key': 'prod',
+            'operator': 'Exists'
+        }
+    ]
+
     def setUp(self):
         if AirflowKubernetesScheduler is None:
             self.skipTest("kubernetes python package is not installed")
 
-        self.pod = mock.patch(
-            'airflow.contrib.kubernetes.worker_configuration.Pod'
-        )
         self.resources = mock.patch(
             'airflow.contrib.kubernetes.worker_configuration.Resources'
         )
@@ -99,7 +130,7 @@ def setUp(self):
             'airflow.contrib.kubernetes.worker_configuration.Secret'
         )
 
-        for patcher in [self.pod, self.resources, self.secret]:
+        for patcher in [self.resources, self.secret]:
             self.mock_foo = patcher.start()
             self.addCleanup(patcher.stop)
 
@@ -200,6 +231,55 @@ def test_worker_environment_dags_folder_using_git_sync(self):
 
         self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
 
+    def test_make_pod_with_empty_executor_config(self):
+        self.kube_config.kube_affinity = self.affinity_config
+        self.kube_config.kube_tolerations = self.tolerations_config
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        kube_executor_config = KubernetesExecutorConfig(annotations=[],
+                                                        volumes=[],
+                                                        volume_mounts=[]
+                                                        )
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
+                                     kube_executor_config)
+
+        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         pod.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(pod.tolerations))
+        self.assertEqual('prod', pod.tolerations[1]['key'])
+
+    def test_make_pod_with_executor_config(self):
+        worker_config = WorkerConfiguration(self.kube_config)
+        kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
+                                                        tolerations=self.tolerations_config,
+                                                        annotations=[],
+                                                        volumes=[],
+                                                        volume_mounts=[]
+                                                        )
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
+                                     kube_executor_config)
+
+        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         pod.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(pod.tolerations))
+        self.assertEqual('prod', pod.tolerations[1]['key'])
+
     def test_worker_pvc_dags(self):
         # Tests persistence volume config created when `dags_volume_claim` is set
         self.kube_config.dags_volume_claim = 'airflow-dags'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services