You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/04 18:35:01 UTC

[airflow] branch v1-10-test updated (e65d442 -> 4827a67)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit e65d442  [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
     new 4827a67  [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e65d442)
            \
             N -- N -- N   refs/heads/v1-10-test (4827a67)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/contrib/operators/kubernetes_pod_operator.py | 5 +++--
 tests/runtime/kubernetes/test_kubernetes_executor.py | 4 ++--
 2 files changed, 5 insertions(+), 4 deletions(-)


[airflow] 01/01: [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4827a67858ef33c4863a7938f3a6c38edf72dd42
Author: david <14...@users.noreply.github.com>
AuthorDate: Thu Jun 4 11:33:58 2020 -0700

    [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
    
    - `security_context` was missing from docs of `KubernetesPodOperator`
    - `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to
    False in comparison to `default_args.py`, also default `do_xcom_push`
    was overwritten to False in contradiction to `BaseOperator`
    - `KubernetesPodOperator` kwarg `resources` is erroneously passed to
    `base_operator`, instead should only go to `PodGenerator`. The two
    have different syntax. (both on `master` and `v1-10-test` branches)
    - `kubernetes/pod.py`: classes do not have `__slots__`
    so they would accept arbitrary values in `setattr`
    - Reduce amount of times the pod object is copied before execution
    
    (cherry picked from commit cf38ddc0571634588883699f481c219a0bb2fbcd)
---
 .../contrib/operators/kubernetes_pod_operator.py   | 65 +++++++++++++++-------
 airflow/kubernetes/pod.py                          | 21 +++++++
 airflow/kubernetes/pod_generator.py                |  1 -
 .../runtime/kubernetes/test_kubernetes_executor.py |  4 +-
 .../kubernetes/test_kubernetes_pod_operator.py     | 25 +++++++--
 5 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index bacf2ed..d1fba07 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -17,11 +17,15 @@
 """Executes task in a Kubernetes POD"""
 import warnings
 
+import re
+
 from airflow.exceptions import AirflowException
-from airflow.kubernetes import pod_generator, kube_client, pod_launcher
+from airflow.kubernetes import kube_client, pod_generator, pod_launcher
 from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod import Resources
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from airflow.utils.helpers import validate_key
 from airflow.utils.state import State
 from airflow.version import version as airflow_version
 
@@ -30,6 +34,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     """
     Execute a task in a Kubernetes Pod
 
+    .. note::
+        If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, use
+        :class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
+        simplifies the authorization process.
+
     :param image: Docker image you wish to launch. Defaults to hub.docker.com,
         but fully qualified URLS will point to custom repositories.
     :type image: str
@@ -47,13 +56,13 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                                If more than one secret is required, provide a
                                comma separated list: secret_a,secret_b
     :type image_pull_secrets: str
-    :param ports: ports for launched pod
-    :type ports: list[airflow.kubernetes.models.port.Port]
-    :param volume_mounts: volumeMounts for launched pod
-    :type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
-    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
-    :type volumes: list[airflow.kubernetes.models.volume.Volume]
-    :param labels: labels to apply to the Pod
+    :param ports: ports for launched pod.
+    :type ports: list[airflow.kubernetes.pod.Port]
+    :param volume_mounts: volumeMounts for launched pod.
+    :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
+    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
+    :type volumes: list[airflow.kubernetes.volume.Volume]
+    :param labels: labels to apply to the Pod.
     :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
     :type startup_timeout_seconds: int
@@ -62,10 +71,10 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     :type name: str
     :param env_vars: Environment variables initialized in the container. (templated)
     :type env_vars: dict
-    :param secrets: Kubernetes secrets to inject in the container,
+    :param secrets: Kubernetes secrets to inject in the container.
         They can be exposed as environment vars or files in a volume.
-    :type secrets: list[airflow.kubernetes.models.secret.Secret]
-    :param in_cluster: run kubernetes client with in_cluster configuration
+    :type secrets: list[airflow.kubernetes.secret.Secret]
+    :param in_cluster: run kubernetes client with in_cluster configuration.
     :type in_cluster: bool
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used.
@@ -86,6 +95,8 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     :param node_selectors: A dict containing a group of scheduling rules.
     :type node_selectors: dict
     :param config_file: The path to the Kubernetes config file. (templated)
+    :param config_file: The path to the Kubernetes config file. (templated)
+        If not specified, default value is ``~/.kube/config``
     :type config_file: str
     :param do_xcom_push: If do_xcom_push is True, the content of the file
         /airflow/xcom/return.json in the container will also be pushed to an
@@ -103,9 +114,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         want mount as env variables.
     :type configmaps: list[str]
     :param pod_runtime_info_envs: environment variables about
-                                  pod runtime information (ip, namespace, nodeName, podName)
-    :type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
-    :param dnspolicy: Specify a dnspolicy for the pod
+                                  pod runtime information (ip, namespace, nodeName, podName).
+    :type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
+    :param security_context: security options the pod should run with (PodSecurityContext).
+    :type security_context: dict
+    :param dnspolicy: dnspolicy for the pod.
     :type dnspolicy: str
     :param full_pod_spec: The complete podSpec
     :type full_pod_spec: kubernetes.client.models.V1Pod
@@ -146,15 +159,18 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 configmaps=self.configmaps,
                 security_context=self.security_context,
                 dnspolicy=self.dnspolicy,
-                resources=self.resources,
                 pod=self.full_pod_spec,
             ).gen_pod()
 
-            pod = append_to_pod(pod, self.ports)
-            pod = append_to_pod(pod, self.pod_runtime_info_envs)
-            pod = append_to_pod(pod, self.volumes)
-            pod = append_to_pod(pod, self.volume_mounts)
-            pod = append_to_pod(pod, self.secrets)
+            pod = append_to_pod(
+                pod,
+                self.pod_runtime_info_envs +
+                self.ports +
+                self.resources +
+                self.secrets +
+                self.volumes +
+                self.volume_mounts
+            )
 
             self.pod = pod
 
@@ -179,6 +195,13 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
 
+    def _set_resources(self, resources):
+        return [Resources(**resources) if resources else Resources()]
+
+    def _set_name(self, name):
+        validate_key(name, max_length=63)
+        return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
+
     @apply_defaults
     def __init__(self,  # pylint: disable=too-many-arguments,too-many-locals
                  namespace,
@@ -244,7 +267,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.node_selectors = node_selectors or {}
         self.annotations = annotations or {}
         self.affinity = affinity or {}
-        self.resources = resources
+        self.resources = self._set_resources(resources)
         self.config_file = config_file
         self.image_pull_secrets = image_pull_secrets
         self.service_account_name = service_account_name
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 53c9172..6a0e788 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -19,13 +19,27 @@ Classes for interacting with Kubernetes API
 """
 
 import copy
+
 import kubernetes.client.models as k8s
+
 from airflow.kubernetes.k8s_model import K8SModel
 
 
 class Resources(K8SModel):
     __slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
 
+    """
+    :param request_memory: requested memory
+    :type request_memory: str
+    :param request_cpu: requested CPU number
+    :type request_cpu: float | str
+    :param limit_memory: limit for memory usage
+    :type limit_memory: str
+    :param limit_cpu: Limit for CPU used
+    :type limit_cpu: float | str
+    :param limit_gpu: Limits for GPU used
+    :type limit_gpu: int
+    """
     def __init__(
             self,
             request_memory=None,
@@ -40,12 +54,15 @@ class Resources(K8SModel):
         self.limit_gpu = limit_gpu
 
     def is_empty_resource_request(self):
+        """Whether resource is empty"""
         return not self.has_limits() and not self.has_requests()
 
     def has_limits(self):
+        """Whether resource has limits"""
         return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None
 
     def has_requests(self):
+        """Whether resource has requests"""
         return self.request_cpu is not None or self.request_memory is not None
 
     def to_k8s_client_obj(self):
@@ -62,10 +79,14 @@ class Resources(K8SModel):
 
 
 class Port(K8SModel):
+    """POD port"""
+    __slots__ = ('name', 'container_port')
+
     def __init__(
             self,
             name=None,
             container_port=None):
+        """Creates port"""
         self.name = name
         self.container_port = container_port
 
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 3e1cb59..5b86161 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -233,7 +233,6 @@ class PodGenerator:
             requests = {
                 'cpu': namespaced.get('request_cpu'),
                 'memory': namespaced.get('request_memory')
-
             }
             limits = {
                 'cpu': namespaced.get('limit_cpu'),
diff --git a/tests/runtime/kubernetes/test_kubernetes_executor.py b/tests/runtime/kubernetes/test_kubernetes_executor.py
index eb9b1ea..ac44f81 100644
--- a/tests/runtime/kubernetes/test_kubernetes_executor.py
+++ b/tests/runtime/kubernetes/test_kubernetes_executor.py
@@ -196,7 +196,7 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.ensure_dag_expected_state(host=host,
                                        execution_date=execution_date,
                                        dag_id=dag_id,
-                                       expected_final_state='success', timeout=100)
+                                       expected_final_state='success', timeout=200)
 
     def test_integration_run_dag_with_scheduler_failure(self):
         host = KUBERNETES_HOST
@@ -229,7 +229,7 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.ensure_dag_expected_state(host=host,
                                        execution_date=execution_date,
                                        dag_id=dag_id,
-                                       expected_final_state='success', timeout=100)
+                                       expected_final_state='success', timeout=200)
 
         self.assertEqual(self._num_pods_in_namespace('test-namespace'),
                          0,
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index c9bc741..f533659 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -25,13 +25,13 @@ import pytest
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
-from airflow.kubernetes.volume import Volume
 from airflow import AirflowException
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
 from airflow.kubernetes.pod import Port
 from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.volume import Volume
 from airflow.kubernetes.volume_mount import VolumeMount
 from airflow.version import version as airflow_version
 from tests.compat import mock
@@ -67,6 +67,11 @@ class TestKubernetesPodOperator(unittest.TestCase):
                     'envFrom': [],
                     'name': 'base',
                     'ports': [],
+                    'resources': {'limits': {'cpu': None,
+                                             'memory': None,
+                                             'nvidia.com/gpu': None},
+                                  'requests': {'cpu': None,
+                                               'memory': None}},
                     'volumeMounts': [],
                 }],
                 'hostNetwork': False,
@@ -114,9 +119,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             labels={"foo": "bar"},
             name="test",
             task_id="task",
-            config_file=file_path,
             in_cluster=False,
             do_xcom_push=False,
+            config_file=file_path,
             cluster_context='default',
         )
         launcher_mock.return_value = (State.SUCCESS, None)
@@ -124,7 +129,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
         client_mock.assert_called_once_with(
             in_cluster=False,
             cluster_context='default',
-            config_file=file_path
+            config_file=file_path,
         )
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@@ -289,7 +294,17 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         k.execute(None)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.expected_pod['spec']['containers'][0]['resources'] = resources
+        self.expected_pod['spec']['containers'][0]['resources'] = {
+            'requests': {
+                'memory': '64Mi',
+                'cpu': '250m'
+            },
+            'limits': {
+                'memory': '64Mi',
+                'cpu': 0.25,
+                'nvidia.com/gpu': None
+            }
+        }
         self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_affinity(self):
@@ -500,7 +515,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
             in_cluster=False,
             do_xcom_push=False,
             startup_timeout_seconds=5,
-            service_account_name=bad_service_account_name
+            service_account_name=bad_service_account_name,
         )
         with self.assertRaises(ApiException):
             k.execute(None)