You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/07 20:46:07 UTC

[GitHub] kaxil closed pull request #4454: [AIRFLOW-3402] Port PR #4247 to 1.10-test

kaxil closed pull request #4454: [AIRFLOW-3402] Port PR #4247 to 1.10-test
URL: https://github.com/apache/airflow/pull/4454
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c8aa4061e7..a72604a536 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -630,6 +630,16 @@ gcp_service_account_keys =
 # It will raise an exception if called from a process not running in a kubernetes environment.
 in_cluster = True
 
+# Affinity configuration as a single line formatted JSON object.
+# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.):
+#   https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core
+affinity =
+
+# A list of toleration objects as a single line formatted JSON array
+# See:
+#   https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core
+tolerations =
+
 [kubernetes_node_selectors]
 # The Key-value pairs to be given to worker pods.
 # The worker pods will be scheduled to the nodes of the specified key-value pairs.
diff --git a/airflow/contrib/example_dags/example_kubernetes_executor.py b/airflow/contrib/example_dags/example_kubernetes_executor.py
index 1d9bb73043..d03e255ab3 100644
--- a/airflow/contrib/example_dags/example_kubernetes_executor.py
+++ b/airflow/contrib/example_dags/example_kubernetes_executor.py
@@ -32,6 +32,31 @@
     schedule_interval=None
 )
 
+affinity = {
+    'podAntiAffinity': {
+        'requiredDuringSchedulingIgnoredDuringExecution': [
+            {
+                'topologyKey': 'kubernetes.io/hostname',
+                'labelSelector': {
+                    'matchExpressions': [
+                        {
+                            'key': 'app',
+                            'operator': 'In',
+                            'values': ['airflow']
+                        }
+                    ]
+                }
+            }
+        ]
+    }
+}
+
+tolerations = [{
+    'key': 'dedicated',
+    'operator': 'Equal',
+    'value': 'airflow'
+}]
+
 
 def print_stuff():
     print("stuff!")
@@ -59,11 +84,14 @@ def use_zip_binary():
     executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
 )
 
-# Limit resources on this operator/task
+# Limit resources on this operator/task with node affinity & tolerations
 three_task = PythonOperator(
     task_id="three_task", python_callable=print_stuff, dag=dag,
     executor_config={
-        "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}}
+        "KubernetesExecutor": {"request_memory": "128Mi",
+                               "limit_memory": "128Mi",
+                               "tolerations": tolerations,
+                               "affinity": affinity}}
 )
 
 start_task.set_downstream([one_task, two_task, three_task])
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index dd9cd3ec53..e06a5f47e1 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import base64
+import json
 import multiprocessing
 from queue import Queue
 from dateutil import parser
@@ -40,7 +41,7 @@ class KubernetesExecutorConfig:
     def __init__(self, image=None, image_pull_policy=None, request_memory=None,
                  request_cpu=None, limit_memory=None, limit_cpu=None,
                  gcp_service_account_key=None, node_selectors=None, affinity=None,
-                 annotations=None, volumes=None, volume_mounts=None):
+                 annotations=None, volumes=None, volume_mounts=None, tolerations=None):
         self.image = image
         self.image_pull_policy = image_pull_policy
         self.request_memory = request_memory
@@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None,
         self.annotations = annotations
         self.volumes = volumes
         self.volume_mounts = volume_mounts
+        self.tolerations = tolerations
 
     def __repr__(self):
         return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
                "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
                "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
-               "volume_mounts={})" \
+               "volume_mounts={}, tolerations={})" \
             .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
                     self.request_memory, self.request_cpu, self.limit_memory,
                     self.limit_cpu, self.gcp_service_account_key, self.node_selectors,
-                    self.affinity, self.annotations, self.volumes, self.volume_mounts)
+                    self.affinity, self.annotations, self.volumes, self.volume_mounts,
+                    self.tolerations)
 
     @staticmethod
     def from_dict(obj):
@@ -88,6 +91,7 @@ def from_dict(obj):
             annotations=namespaced.get('annotations', {}),
             volumes=namespaced.get('volumes', []),
             volume_mounts=namespaced.get('volume_mounts', []),
+            tolerations=namespaced.get('tolerations', None),
         )
 
     def as_dict(self):
@@ -104,6 +108,7 @@ def as_dict(self):
             'annotations': self.annotations,
             'volumes': self.volumes,
             'volume_mounts': self.volume_mounts,
+            'tolerations': self.tolerations,
         }
 
 
@@ -205,6 +210,18 @@ def __init__(self):
         # configmap
         self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')
 
+        affinity_json = conf.get(self.kubernetes_section, 'affinity')
+        if affinity_json:
+            self.kube_affinity = json.loads(affinity_json)
+        else:
+            self.kube_affinity = None
+
+        tolerations_json = conf.get(self.kubernetes_section, 'tolerations')
+        if tolerations_json:
+            self.kube_tolerations = json.loads(tolerations_json)
+        else:
+            self.kube_tolerations = None
+
         self._validate()
 
     def _validate(self):
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index bad5caa738..6d29775925 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -60,6 +60,10 @@ class Pod:
     :type image_pull_secrets: str
     :param affinity: A dict containing a group of affinity scheduling rules
     :type affinity: dict
+    :param hostnetwork: If True enable host networking on the pod
+    :type hostnetwork: bool
+    :param tolerations: A list of kubernetes tolerations
+    :type tolerations: list
     """
     def __init__(
             self,
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index d83be81c1f..c2e7768baa 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -205,10 +205,13 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
             limit_cpu=kube_executor_config.limit_cpu
         )
         gcp_sa_key = kube_executor_config.gcp_service_account_key
-        annotations = kube_executor_config.annotations.copy()
+        annotations = dict(kube_executor_config.annotations)
         if gcp_sa_key:
             annotations['iam.cloud.google.com/service-account'] = gcp_sa_key
 
+        affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
+        tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations
+
         return Pod(
             namespace=namespace,
             name=pod_id,
@@ -234,5 +237,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
             annotations=annotations,
             node_selectors=(kube_executor_config.node_selectors or
                             self.kube_config.kube_node_selectors),
-            affinity=kube_executor_config.affinity
+            affinity=affinity,
+            tolerations=tolerations
         )
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index a29b61998d..4494754f97 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator):
         /airflow/xcom/return.json in the container will also be pushed to an
         XCom when the container completes.
     :type xcom_push: bool
-    :param tolerations: Kubernetes tolerations
-    :type list of tolerations
+    :param hostnetwork: If True enable host networking on the pod
+    :type hostnetwork: bool
+    :param tolerations: A list of kubernetes tolerations
+    :type tolerations: list tolerations
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
index c8e6b19076..b5fa3e5f63 100644
--- a/scripts/ci/kubernetes/kube/configmaps.yaml
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -192,6 +192,10 @@ data:
     namespace = default
     gcp_service_account_keys =
 
+    # Example affinity and toleration definitions.
+    affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
+    tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
+
     # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
     git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
     git_sync_container_tag = v2.0.5
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py
index f93a9d81e1..1b5c4c015d 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -14,6 +14,7 @@
 #
 
 import unittest
+import uuid
 import mock
 import re
 import string
@@ -22,6 +23,7 @@
 
 try:
     from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
+    from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
     from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
 except ImportError:
     AirflowKubernetesScheduler = None
@@ -81,13 +83,42 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
     Tests that if dags_volume_subpath/logs_volume_subpath configuration
     options are passed to worker pod config
     """
+
+    affinity_config = {
+        'podAntiAffinity': {
+            'requiredDuringSchedulingIgnoredDuringExecution': [
+                {
+                    'topologyKey': 'kubernetes.io/hostname',
+                    'labelSelector': {
+                        'matchExpressions': [
+                            {
+                                'key': 'app',
+                                'operator': 'In',
+                                'values': ['airflow']
+                            }
+                        ]
+                    }
+                }
+            ]
+        }
+    }
+
+    tolerations_config = [
+        {
+            'key': 'dedicated',
+            'operator': 'Equal',
+            'value': 'airflow'
+        },
+        {
+            'key': 'prod',
+            'operator': 'Exists'
+        }
+    ]
+
     def setUp(self):
         if AirflowKubernetesScheduler is None:
             self.skipTest("kubernetes python package is not installed")
 
-        self.pod = mock.patch(
-            'airflow.contrib.kubernetes.worker_configuration.Pod'
-        )
         self.resources = mock.patch(
             'airflow.contrib.kubernetes.worker_configuration.Resources'
         )
@@ -95,7 +126,7 @@ def setUp(self):
             'airflow.contrib.kubernetes.worker_configuration.Secret'
         )
 
-        for patcher in [self.pod, self.resources, self.secret]:
+        for patcher in [self.resources, self.secret]:
             self.mock_foo = patcher.start()
             self.addCleanup(patcher.stop)
 
@@ -152,6 +183,55 @@ def test_worker_environment_when_dags_folder_specified(self):
 
         self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
 
+    def test_make_pod_with_empty_executor_config(self):
+        self.kube_config.kube_affinity = self.affinity_config
+        self.kube_config.kube_tolerations = self.tolerations_config
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        kube_executor_config = KubernetesExecutorConfig(annotations=[],
+                                                        volumes=[],
+                                                        volume_mounts=[]
+                                                        )
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
+                                     kube_executor_config)
+
+        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         pod.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(pod.tolerations))
+        self.assertEqual('prod', pod.tolerations[1]['key'])
+
+    def test_make_pod_with_executor_config(self):
+        worker_config = WorkerConfiguration(self.kube_config)
+        kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
+                                                        tolerations=self.tolerations_config,
+                                                        annotations=[],
+                                                        volumes=[],
+                                                        volume_mounts=[]
+                                                        )
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
+                                     kube_executor_config)
+
+        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         pod.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(pod.tolerations))
+        self.assertEqual('prod', pod.tolerations[1]['key'])
+
     def test_worker_pvc_dags(self):
         # Tests persistence volume config created when `dags_volume_claim` is set
         self.kube_config.dags_volume_claim = 'airflow-dags'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services