You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/10/28 20:41:17 UTC
[airflow] branch main updated: Drop support for providing ``resource`` as dict in ``KubernetesPodOperator`` (#27197)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0c26ec07be Drop support for providing ``resource`` as dict in ``KubernetesPodOperator`` (#27197)
0c26ec07be is described below
commit 0c26ec07be96ae250dd2052f3c3bf552221d0e03
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Fri Oct 28 23:40:55 2022 +0300
Drop support for providing ``resource`` as dict in ``KubernetesPodOperator`` (#27197)
* Drop support for providing ``resource`` as dict in ``KubernetesPodOperator``
* update tests
* remove resources test from backcompat
---
airflow/providers/cncf/kubernetes/CHANGELOG.rst | 7 ++++-
.../backcompat/backwards_compat_converters.py | 14 ----------
.../cncf/kubernetes/operators/kubernetes_pod.py | 21 ++++++---------
kubernetes_tests/test_kubernetes_pod_operator.py | 24 ++++++++++++++++-
.../test_kubernetes_pod_operator_backcompat.py | 31 ----------------------
5 files changed, 37 insertions(+), 60 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index b379dc45be..53a457cc52 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -32,6 +32,8 @@ Breaking changes
Previously KubernetesPodOperator considered some settings from the Airflow config's ``kubernetes`` section. Such consideration was deprecated in 4.1.0 and is now removed. If you previously relied on the Airflow config, and you want client generation to have non-default configuration, you will need to define your configuration in an Airflow connection and set KPO to use the connection. See kubernetes provider documentation on defining a kubernetes Airflow connection for details.
+Drop support for providing ``resource`` as dict in ``KubernetesPodOperator``. You should use ``container_resources`` with ``V1ResourceRequirements``.
+
Features
~~~~~~~~
@@ -97,6 +99,10 @@ Bug Fixes
* ``Revert "Fix await_container_completion condition (#23883)" (#24474)``
* ``Update providers to use functools compat for ''cached_property'' (#24582)``
+Misc
+~~~~
+* ``Rename 'resources' arg in Kub op to k8s_resources (#24673)``
+
.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
* ``Only assert stuff for mypy when type checking (#24937)``
@@ -105,7 +111,6 @@ Bug Fixes
* ``Move provider dependencies to inside provider folders (#24672)``
* ``Use our yaml util in all providers (#24720)``
* ``Remove 'hook-class-names' from provider.yaml (#24702)``
- * ``Rename 'resources' arg in Kub op to k8s_resources (#24673)``
4.1.0
.....
diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index dfb6e31038..9d37da983e 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -62,20 +62,6 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)
-def convert_resources(resources) -> k8s.V1ResourceRequirements:
- """
- Converts an airflow Resources object into a k8s.V1ResourceRequirements
-
- :param resources:
- :return: k8s.V1ResourceRequirements
- """
- if isinstance(resources, dict):
- from airflow.providers.cncf.kubernetes.backcompat.pod import Resources
-
- resources = Resources(**resources)
- return _convert_kube_model_object(resources, k8s.V1ResourceRequirements)
-
-
def convert_port(port) -> k8s.V1ContainerPort:
"""
Converts an airflow Port object into a k8s.V1ContainerPort
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 447d185ddb..a335660eb9 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -39,7 +39,6 @@ from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
convert_image_pull_secrets,
convert_pod_runtime_info_env,
convert_port,
- convert_resources,
convert_toleration,
convert_volume,
convert_volume_mount,
@@ -213,21 +212,17 @@ class KubernetesPodOperator(BaseOperator):
pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
- resources: dict[str, Any] | None = None,
**kwargs,
) -> None:
-
- if isinstance(resources, k8s.V1ResourceRequirements):
- warnings.warn(
+ # TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
+ # container_resources parameter.
+ if isinstance(kwargs.get("resources"), k8s.V1ResourceRequirements):
+ raise AirflowException(
"Specifying resources for the launched pod with 'resources' is deprecated. "
- "Use 'container_resources' instead.",
- category=DeprecationWarning,
- stacklevel=2,
+ "Use 'container_resources' instead."
)
- container_resources = resources
- resources = None
- super().__init__(resources=resources, **kwargs)
+ super().__init__(**kwargs)
self.kubernetes_conn_id = kubernetes_conn_id
self.do_xcom_push = do_xcom_push
self.image = image
@@ -263,7 +258,7 @@ class KubernetesPodOperator(BaseOperator):
self.node_selector = {}
self.annotations = annotations or {}
self.affinity = convert_affinity(affinity) if affinity else {}
- self.k8s_resources = convert_resources(container_resources) if container_resources else {}
+ self.container_resources = container_resources
self.config_file = config_file
self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
self.service_account_name = service_account_name
@@ -553,7 +548,7 @@ class KubernetesPodOperator(BaseOperator):
command=self.cmds,
ports=self.ports,
image_pull_policy=self.image_pull_policy,
- resources=self.k8s_resources,
+ resources=self.container_resources,
volume_mounts=self.volume_mounts,
args=self.arguments,
env=self.env_vars,
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 72a09955c0..e1e6812018 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -120,7 +120,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
"command": ["bash", "-cx"],
"env": [],
"envFrom": [],
- "resources": {},
"name": "base",
"ports": [],
"volumeMounts": [],
@@ -1150,3 +1149,26 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
with pytest.raises(AirflowException):
k.execute(context)
create_mock.assert_called_once()
+
+ def test_using_resources(self):
+ exception_message = (
+ "Specifying resources for the launched pod with 'resources' is deprecated. "
+ "Use 'container_resources' instead."
+ )
+ with pytest.raises(AirflowException, match=exception_message):
+ resources = k8s.V1ResourceRequirements(
+ requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
+ limits={"memory": "64Mi", "cpu": 0.25, "nvidia.com/gpu": None, "ephemeral-storage": "2Gi"},
+ )
+ KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels=self.labels,
+ name="test-" + str(random.randint(0, 1000000)),
+ task_id="task" + self.get_current_task_name(),
+ in_cluster=False,
+ do_xcom_push=False,
+ resources=resources,
+ )
diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
index 5528b251e8..71b314ecae 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
@@ -107,7 +107,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
"command": ["bash", "-cx"],
"env": [],
"envFrom": [],
- "resources": {},
"name": "base",
"ports": [],
"volumeMounts": [],
@@ -194,36 +193,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.expected_pod["spec"]["nodeSelector"] = node_selectors
assert self.expected_pod == actual_pod
- def test_pod_resources(self):
- resources = {
- "limit_cpu": 0.25,
- "limit_memory": "64Mi",
- "limit_ephemeral_storage": "2Gi",
- "request_cpu": "250m",
- "request_memory": "64Mi",
- "request_ephemeral_storage": "1Gi",
- }
- k = KubernetesPodOperator(
- namespace="default",
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo 10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- in_cluster=False,
- do_xcom_push=False,
- container_resources=resources,
- )
- context = create_context(k)
- k.execute(context)
- actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- self.expected_pod["spec"]["containers"][0]["resources"] = {
- "requests": {"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
- "limits": {"memory": "64Mi", "cpu": 0.25, "ephemeral-storage": "2Gi"},
- }
- assert self.expected_pod == actual_pod
-
def test_pod_affinity(self):
affinity = {
"nodeAffinity": {