You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/10/28 20:41:17 UTC

[airflow] branch main updated: Drop support for providing ``resource`` as dict in ``KubernetesPodOperator`` (#27197)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c26ec07be Drop support for providing ``resource`` as dict in ``KubernetesPodOperator`` (#27197)
0c26ec07be is described below

commit 0c26ec07be96ae250dd2052f3c3bf552221d0e03
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Fri Oct 28 23:40:55 2022 +0300

    Drop support for providing ``resource`` as dict in ``KubernetesPodOperator`` (#27197)
    
    * Drop support for providing ``resource`` as dict in ``KubernetesPodOperator``
    
    * update tests
    
    * remove resources test from backcompat
---
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    |  7 ++++-
 .../backcompat/backwards_compat_converters.py      | 14 ----------
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 21 ++++++---------
 kubernetes_tests/test_kubernetes_pod_operator.py   | 24 ++++++++++++++++-
 .../test_kubernetes_pod_operator_backcompat.py     | 31 ----------------------
 5 files changed, 37 insertions(+), 60 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index b379dc45be..53a457cc52 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -32,6 +32,8 @@ Breaking changes
 
 Previously KubernetesPodOperator considered some settings from the Airflow config's ``kubernetes`` section.  Such consideration was deprecated in 4.1.0 and is now removed.  If you previously relied on the Airflow config, and you want client generation to have non-default configuration, you will need to define your configuration in an Airflow connection and set KPO to use the connection.  See kubernetes provider documentation on defining a kubernetes Airflow connection for details.
 
+Drop support for providing ``resource`` as dict in ``KubernetesPodOperator``. You should use ``container_resources`` with ``V1ResourceRequirements``.
+
 Features
 ~~~~~~~~
 
@@ -97,6 +99,10 @@ Bug Fixes
 * ``Revert "Fix await_container_completion condition (#23883)" (#24474)``
 * ``Update providers to use functools compat for ''cached_property'' (#24582)``
 
+Misc
+~~~~
+* ``Rename 'resources' arg in Kub op to k8s_resources (#24673)``
+
 .. Below changes are excluded from the changelog. Move them to
    appropriate section above if needed. Do not delete the lines(!):
    * ``Only assert stuff for mypy when type checking (#24937)``
@@ -105,7 +111,6 @@ Bug Fixes
    * ``Move provider dependencies to inside provider folders (#24672)``
    * ``Use our yaml util in all providers (#24720)``
    * ``Remove 'hook-class-names' from provider.yaml (#24702)``
-   * ``Rename 'resources' arg in Kub op to k8s_resources (#24673)``
 
 4.1.0
 .....
diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index dfb6e31038..9d37da983e 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -62,20 +62,6 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
     return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)
 
 
-def convert_resources(resources) -> k8s.V1ResourceRequirements:
-    """
-    Converts an airflow Resources object into a k8s.V1ResourceRequirements
-
-    :param resources:
-    :return: k8s.V1ResourceRequirements
-    """
-    if isinstance(resources, dict):
-        from airflow.providers.cncf.kubernetes.backcompat.pod import Resources
-
-        resources = Resources(**resources)
-    return _convert_kube_model_object(resources, k8s.V1ResourceRequirements)
-
-
 def convert_port(port) -> k8s.V1ContainerPort:
     """
     Converts an airflow Port object into a k8s.V1ContainerPort
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 447d185ddb..a335660eb9 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -39,7 +39,6 @@ from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
     convert_image_pull_secrets,
     convert_pod_runtime_info_env,
     convert_port,
-    convert_resources,
     convert_toleration,
     convert_volume,
     convert_volume_mount,
@@ -213,21 +212,17 @@ class KubernetesPodOperator(BaseOperator):
         pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
         termination_grace_period: int | None = None,
         configmaps: list[str] | None = None,
-        resources: dict[str, Any] | None = None,
         **kwargs,
     ) -> None:
-
-        if isinstance(resources, k8s.V1ResourceRequirements):
-            warnings.warn(
+        # TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
+        # container_resources parameter.
+        if isinstance(kwargs.get("resources"), k8s.V1ResourceRequirements):
+            raise AirflowException(
                 "Specifying resources for the launched pod with 'resources' is deprecated. "
-                "Use 'container_resources' instead.",
-                category=DeprecationWarning,
-                stacklevel=2,
+                "Use 'container_resources' instead."
             )
-            container_resources = resources
-            resources = None
 
-        super().__init__(resources=resources, **kwargs)
+        super().__init__(**kwargs)
         self.kubernetes_conn_id = kubernetes_conn_id
         self.do_xcom_push = do_xcom_push
         self.image = image
@@ -263,7 +258,7 @@ class KubernetesPodOperator(BaseOperator):
             self.node_selector = {}
         self.annotations = annotations or {}
         self.affinity = convert_affinity(affinity) if affinity else {}
-        self.k8s_resources = convert_resources(container_resources) if container_resources else {}
+        self.container_resources = container_resources
         self.config_file = config_file
         self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
         self.service_account_name = service_account_name
@@ -553,7 +548,7 @@ class KubernetesPodOperator(BaseOperator):
                         command=self.cmds,
                         ports=self.ports,
                         image_pull_policy=self.image_pull_policy,
-                        resources=self.k8s_resources,
+                        resources=self.container_resources,
                         volume_mounts=self.volume_mounts,
                         args=self.arguments,
                         env=self.env_vars,
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 72a09955c0..e1e6812018 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -120,7 +120,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                         "command": ["bash", "-cx"],
                         "env": [],
                         "envFrom": [],
-                        "resources": {},
                         "name": "base",
                         "ports": [],
                         "volumeMounts": [],
@@ -1150,3 +1149,26 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             with pytest.raises(AirflowException):
                 k.execute(context)
             create_mock.assert_called_once()
+
+    def test_using_resources(self):
+        exception_message = (
+            "Specifying resources for the launched pod with 'resources' is deprecated. "
+            "Use 'container_resources' instead."
+        )
+        with pytest.raises(AirflowException, match=exception_message):
+            resources = k8s.V1ResourceRequirements(
+                requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
+                limits={"memory": "64Mi", "cpu": 0.25, "nvidia.com/gpu": None, "ephemeral-storage": "2Gi"},
+            )
+            KubernetesPodOperator(
+                namespace="default",
+                image="ubuntu:16.04",
+                cmds=["bash", "-cx"],
+                arguments=["echo 10"],
+                labels=self.labels,
+                name="test-" + str(random.randint(0, 1000000)),
+                task_id="task" + self.get_current_task_name(),
+                in_cluster=False,
+                do_xcom_push=False,
+                resources=resources,
+            )
diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
index 5528b251e8..71b314ecae 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
@@ -107,7 +107,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                         "command": ["bash", "-cx"],
                         "env": [],
                         "envFrom": [],
-                        "resources": {},
                         "name": "base",
                         "ports": [],
                         "volumeMounts": [],
@@ -194,36 +193,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.expected_pod["spec"]["nodeSelector"] = node_selectors
         assert self.expected_pod == actual_pod
 
-    def test_pod_resources(self):
-        resources = {
-            "limit_cpu": 0.25,
-            "limit_memory": "64Mi",
-            "limit_ephemeral_storage": "2Gi",
-            "request_cpu": "250m",
-            "request_memory": "64Mi",
-            "request_ephemeral_storage": "1Gi",
-        }
-        k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            container_resources=resources,
-        )
-        context = create_context(k)
-        k.execute(context)
-        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.expected_pod["spec"]["containers"][0]["resources"] = {
-            "requests": {"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
-            "limits": {"memory": "64Mi", "cpu": 0.25, "ephemeral-storage": "2Gi"},
-        }
-        assert self.expected_pod == actual_pod
-
     def test_pod_affinity(self):
         affinity = {
             "nodeAffinity": {