You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/04 18:35:01 UTC
[airflow] branch v1-10-test updated (e65d442 -> 4827a67)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.
omit e65d442 [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
new 4827a67 [AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (e65d442)
\
N -- N -- N refs/heads/v1-10-test (4827a67)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
airflow/contrib/operators/kubernetes_pod_operator.py | 5 +++--
tests/runtime/kubernetes/test_kubernetes_executor.py | 4 ++--
2 files changed, 5 insertions(+), 4 deletions(-)
[airflow] 01/01: [AIRFLOW-5873] KubernetesPodOperator fixes and
test (#6524)
Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4827a67858ef33c4863a7938f3a6c38edf72dd42
Author: david <14...@users.noreply.github.com>
AuthorDate: Thu Jun 4 11:33:58 2020 -0700
[AIRFLOW-5873] KubernetesPodOperator fixes and test (#6524)
- `security_context` was missing from docs of `KubernetesPodOperator`
- `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to
False in comparison to `default_args.py`, also default `do_xcom_push`
was overwritten to False in contradiction to `BaseOperator`
- `KubernetesPodOperator` kwarg `resources` is erroneously passed to
`base_operator`, instead should only go to `PodGenerator`. The two
have different syntax. (both on `master` and `v1-10-test` branches)
- `kubernetes/pod.py`: classes do not have `__slots__`
so they would accept arbitrary values in `setattr`
- Reduce amount of times the pod object is copied before execution
(cherry picked from commit cf38ddc0571634588883699f481c219a0bb2fbcd)
---
.../contrib/operators/kubernetes_pod_operator.py | 65 +++++++++++++++-------
airflow/kubernetes/pod.py | 21 +++++++
airflow/kubernetes/pod_generator.py | 1 -
.../runtime/kubernetes/test_kubernetes_executor.py | 4 +-
.../kubernetes/test_kubernetes_pod_operator.py | 25 +++++++--
5 files changed, 87 insertions(+), 29 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index bacf2ed..d1fba07 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -17,11 +17,15 @@
"""Executes task in a Kubernetes POD"""
import warnings
+import re
+
from airflow.exceptions import AirflowException
-from airflow.kubernetes import pod_generator, kube_client, pod_launcher
+from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod import Resources
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
+from airflow.utils.helpers import validate_key
from airflow.utils.state import State
from airflow.version import version as airflow_version
@@ -30,6 +34,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
"""
Execute a task in a Kubernetes Pod
+ .. note::
+ If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, use
+ :class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
+ simplifies the authorization process.
+
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories.
:type image: str
@@ -47,13 +56,13 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
- :param ports: ports for launched pod
- :type ports: list[airflow.kubernetes.models.port.Port]
- :param volume_mounts: volumeMounts for launched pod
- :type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
- :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
- :type volumes: list[airflow.kubernetes.models.volume.Volume]
- :param labels: labels to apply to the Pod
+ :param ports: ports for launched pod.
+ :type ports: list[airflow.kubernetes.pod.Port]
+ :param volume_mounts: volumeMounts for launched pod.
+ :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
+ :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
+ :type volumes: list[airflow.kubernetes.volume.Volume]
+ :param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
@@ -62,10 +71,10 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
- :param secrets: Kubernetes secrets to inject in the container,
+ :param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume.
- :type secrets: list[airflow.kubernetes.models.secret.Secret]
- :param in_cluster: run kubernetes client with in_cluster configuration
+ :type secrets: list[airflow.kubernetes.secret.Secret]
+ :param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
@@ -86,6 +95,8 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file. (templated)
+ :param config_file: The path to the Kubernetes config file. (templated)
+ If not specified, default value is ``~/.kube/config``
:type config_file: str
:param do_xcom_push: If do_xcom_push is True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
@@ -103,9 +114,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
- pod runtime information (ip, namespace, nodeName, podName)
- :type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
- :param dnspolicy: Specify a dnspolicy for the pod
+ pod runtime information (ip, namespace, nodeName, podName).
+ :type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
+ :param security_context: security options the pod should run with (PodSecurityContext).
+ :type security_context: dict
+ :param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
:param full_pod_spec: The complete podSpec
:type full_pod_spec: kubernetes.client.models.V1Pod
@@ -146,15 +159,18 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
configmaps=self.configmaps,
security_context=self.security_context,
dnspolicy=self.dnspolicy,
- resources=self.resources,
pod=self.full_pod_spec,
).gen_pod()
- pod = append_to_pod(pod, self.ports)
- pod = append_to_pod(pod, self.pod_runtime_info_envs)
- pod = append_to_pod(pod, self.volumes)
- pod = append_to_pod(pod, self.volume_mounts)
- pod = append_to_pod(pod, self.secrets)
+ pod = append_to_pod(
+ pod,
+ self.pod_runtime_info_envs +
+ self.ports +
+ self.resources +
+ self.secrets +
+ self.volumes +
+ self.volume_mounts
+ )
self.pod = pod
@@ -179,6 +195,13 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
+ def _set_resources(self, resources):
+ return [Resources(**resources) if resources else Resources()]
+
+ def _set_name(self, name):
+ validate_key(name, max_length=63)
+ return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
+
@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
@@ -244,7 +267,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
- self.resources = resources
+ self.resources = self._set_resources(resources)
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
self.service_account_name = service_account_name
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 53c9172..6a0e788 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -19,13 +19,27 @@ Classes for interacting with Kubernetes API
"""
import copy
+
import kubernetes.client.models as k8s
+
from airflow.kubernetes.k8s_model import K8SModel
class Resources(K8SModel):
__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
+ """
+ :param request_memory: requested memory
+ :type request_memory: str
+ :param request_cpu: requested CPU number
+ :type request_cpu: float | str
+ :param limit_memory: limit for memory usage
+ :type limit_memory: str
+ :param limit_cpu: Limit for CPU used
+ :type limit_cpu: float | str
+ :param limit_gpu: Limits for GPU used
+ :type limit_gpu: int
+ """
def __init__(
self,
request_memory=None,
@@ -40,12 +54,15 @@ class Resources(K8SModel):
self.limit_gpu = limit_gpu
def is_empty_resource_request(self):
+ """Whether resource is empty"""
return not self.has_limits() and not self.has_requests()
def has_limits(self):
+ """Whether resource has limits"""
return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None
def has_requests(self):
+ """Whether resource has requests"""
return self.request_cpu is not None or self.request_memory is not None
def to_k8s_client_obj(self):
@@ -62,10 +79,14 @@ class Resources(K8SModel):
class Port(K8SModel):
+ """POD port"""
+ __slots__ = ('name', 'container_port')
+
def __init__(
self,
name=None,
container_port=None):
+ """Creates port"""
self.name = name
self.container_port = container_port
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 3e1cb59..5b86161 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -233,7 +233,6 @@ class PodGenerator:
requests = {
'cpu': namespaced.get('request_cpu'),
'memory': namespaced.get('request_memory')
-
}
limits = {
'cpu': namespaced.get('limit_cpu'),
diff --git a/tests/runtime/kubernetes/test_kubernetes_executor.py b/tests/runtime/kubernetes/test_kubernetes_executor.py
index eb9b1ea..ac44f81 100644
--- a/tests/runtime/kubernetes/test_kubernetes_executor.py
+++ b/tests/runtime/kubernetes/test_kubernetes_executor.py
@@ -196,7 +196,7 @@ class TestKubernetesExecutor(unittest.TestCase):
self.ensure_dag_expected_state(host=host,
execution_date=execution_date,
dag_id=dag_id,
- expected_final_state='success', timeout=100)
+ expected_final_state='success', timeout=200)
def test_integration_run_dag_with_scheduler_failure(self):
host = KUBERNETES_HOST
@@ -229,7 +229,7 @@ class TestKubernetesExecutor(unittest.TestCase):
self.ensure_dag_expected_state(host=host,
execution_date=execution_date,
dag_id=dag_id,
- expected_final_state='success', timeout=100)
+ expected_final_state='success', timeout=200)
self.assertEqual(self._num_pods_in_namespace('test-namespace'),
0,
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index c9bc741..f533659 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -25,13 +25,13 @@ import pytest
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
-from airflow.kubernetes.volume import Volume
from airflow import AirflowException
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.pod import Port
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.version import version as airflow_version
from tests.compat import mock
@@ -67,6 +67,11 @@ class TestKubernetesPodOperator(unittest.TestCase):
'envFrom': [],
'name': 'base',
'ports': [],
+ 'resources': {'limits': {'cpu': None,
+ 'memory': None,
+ 'nvidia.com/gpu': None},
+ 'requests': {'cpu': None,
+ 'memory': None}},
'volumeMounts': [],
}],
'hostNetwork': False,
@@ -114,9 +119,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
labels={"foo": "bar"},
name="test",
task_id="task",
- config_file=file_path,
in_cluster=False,
do_xcom_push=False,
+ config_file=file_path,
cluster_context='default',
)
launcher_mock.return_value = (State.SUCCESS, None)
@@ -124,7 +129,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
client_mock.assert_called_once_with(
in_cluster=False,
cluster_context='default',
- config_file=file_path
+ config_file=file_path,
)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@@ -289,7 +294,17 @@ class TestKubernetesPodOperator(unittest.TestCase):
)
k.execute(None)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- self.expected_pod['spec']['containers'][0]['resources'] = resources
+ self.expected_pod['spec']['containers'][0]['resources'] = {
+ 'requests': {
+ 'memory': '64Mi',
+ 'cpu': '250m'
+ },
+ 'limits': {
+ 'memory': '64Mi',
+ 'cpu': 0.25,
+ 'nvidia.com/gpu': None
+ }
+ }
self.assertEqual(self.expected_pod, actual_pod)
def test_pod_affinity(self):
@@ -500,7 +515,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
in_cluster=False,
do_xcom_push=False,
startup_timeout_seconds=5,
- service_account_name=bad_service_account_name
+ service_account_name=bad_service_account_name,
)
with self.assertRaises(ApiException):
k.execute(None)