You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "raphaelauv (via GitHub)" <gi...@apache.org> on 2023/03/05 14:06:18 UTC

[GitHub] [airflow] raphaelauv opened a new pull request, #29930: KubernetesResourceOperator - create PVC & delete PVC

raphaelauv opened a new pull request, #29930:
URL: https://github.com/apache/airflow/pull/29930

   Add an abstract KubernetesResourceOperator 
   
   and two concrete operators : create_pvc and delete_pvc
   
   missing documentation and tests
   
   example of use
   
   ```python
   
   pvc_name = "{{ run_id }}"
   
   pvc_conf = f"""
   apiVersion: v1
   kind: PersistentVolumeClaim
   metadata:
     name: {pvc_name}
   spec:
     accessModes:
       - ReadWriteOnce
     storageClassName: standard-rwo
     resources:
       requests:
         storage: 500Gi
   """
   
   create_pvc = kubernetesPVCcreate(
       task_id="create_pvc",
       pvc_conf=pvc_conf,
   )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala merged pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala merged PR #29930:
URL: https://github.com/apache/airflow/pull/29930


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1573971940

   @jedcunningham any further comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1484949154

   hey @potiuk I think I fixed the issues related to the PR , could you have a second look , thanks :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1518673396

   Tests failing :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1541912389

   @jedcunningham could you review again , thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1184192136


##########
tests/always/test_project_structure.py:
##########
@@ -428,6 +428,11 @@ class TestCncfProviderProjectStructure(ExampleCoverageTest):
         "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
         "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod",
     }
+    BASE_CLASSES = {"airflow.providers.cncf.kubernetes.operators.resource.KubernetesResourceBaseOperator"}
+
+    MISSING_EXAMPLES_FOR_CLASSES = {
+        "airflow.providers.cncf.kubernetes.operators.resource.KubernetesResourceBaseOperator"
+    }

Review Comment:
   thats odd?
   base class by definition should not have example dags. I don't think it's required to list them here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1185624181


##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    api_exceptions = []
+    if "List" in yml_document["kind"]:
+        kind = yml_document["kind"].replace("List", "")
+        for yml_doc in yml_document["items"]:
+            if kind != "":
+                yml_doc["apiVersion"] = yml_document["apiVersion"]
+                yml_doc["kind"] = kind
+            try:
+                _delete_from_yaml_single_item(
+                    k8s_client,
+                    yml_doc,
+                    verbose,
+                    namespace=namespace,
+                    body=body,
+                    **kwargs,
+                )
+            except client.rest.ApiException as api_exception:
+                api_exceptions.append(api_exception)
+    else:
+
+        try:
+            _delete_from_yaml_single_item(
+                k8s_client,
+                yml_document,
+                verbose,
+                namespace=namespace,
+                body=body,
+                **kwargs,
+            )
+        except client.rest.ApiException as api_exception:
+            api_exceptions.append(api_exception)
+
+    if api_exceptions:
+        raise FailToDeleteError(api_exceptions)
+
+
+def _delete_from_yaml_single_item(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    # get group and version from apiVersion
+    group, _, version = yml_document["apiVersion"].partition("/")
+    if version == "":
+        version = group
+        group = "core"
+    # Take care for the case e.g. api_type is "apiextensions.k8s.io"
+    # Only replace the last instance
+    group = "".join(group.rsplit(".k8s.io", 1))
+    # convert group name from DNS subdomain format to
+    # python class name convention
+    group = "".join(word.capitalize() for word in group.split("."))
+    fcn_to_call = f"{group}{version.capitalize()}Api"
+    k8s_api = getattr(client, fcn_to_call)(k8s_client)
+    # Replace CamelCased action_type into snake_case
+    kind = yml_document["kind"]
+    kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
+    kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
+
+    # Decide which namespace we are going to use for deleting the object
+    # IMPORTANT: Its ignore namespace in args:
+    #    create_from_yaml_single_item have same behaviour
+    if "namespace" in yml_document["metadata"]:
+        namespace = yml_document["metadata"]["namespace"]
+    name = yml_document["metadata"]["name"]
+
+    # Expect the user to delete namespaced objects more often
+    if hasattr(k8s_api, f"delete_namespaced_{kind}"):
+        resp: client.V1Status = getattr(k8s_api, f"delete_namespaced_{kind}")(
+            name=name, namespace=namespace, body=body, **kwargs
+        )
+    else:
+        resp: client.V1Status = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs)
+    if verbose:
+        print(f"{kind} deleted. status='{str(resp.status)}'")
+    return resp
+
+
+class FailToDeleteError(Exception):
+    """
+    An exception class for handling error if an error occurred when
+    handling a yaml file during deletion of the resource.
+    """
+
+    def __init__(self, api_exceptions):
+        self.api_exceptions = api_exceptions
+
+    def __str__(self):
+        msg = ""
+        for api_exception in self.api_exceptions:
+            msg += f"Error from server ({api_exception.reason}):{api_exception.body}"

Review Comment:
   I added a `\n`
   



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########


Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1503507512

   just rebased on main branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "jedcunningham (via GitHub)" <gi...@apache.org>.
jedcunningham commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1163261287


##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict
+from airflow.utils import yaml
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """Abstract base class for all Kubernetes Resource operators."""
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = "kubernetes_default",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+        self.yaml_conf = yaml_conf
+
+    @cached_property
+    def client(self) -> ApiClient:
+        return self.hook.api_client
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        hook = KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+        return hook
+
+    def get_namespace(self) -> str:
+        if self._namespace:
+            return self._namespace
+        else:
+            tmp = self.hook.get_namespace()
+            if tmp:
+                return tmp
+            else:
+                return "default"

Review Comment:
   ```suggestion
           return self.hook.get_namespace() or "default"
   ```
   
   Simpler.



##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict
+from airflow.utils import yaml
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """Abstract base class for all Kubernetes Resource operators."""
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = "kubernetes_default",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+        self.yaml_conf = yaml_conf
+
+    @cached_property
+    def client(self) -> ApiClient:
+        return self.hook.api_client
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        hook = KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+        return hook
+
+    def get_namespace(self) -> str:
+        if self._namespace:
+            return self._namespace
+        else:
+            tmp = self.hook.get_namespace()
+            if tmp:
+                return tmp
+            else:
+                return "default"
+
+
+class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
+    """Create a resource in a kubernetes."""
+
+    def execute(self, context) -> None:
+        print(self.yaml_conf)
+        create_from_yaml(
+            k8s_client=self.client,
+            yaml_objects=[yaml.safe_load(self.yaml_conf)],
+            namespace=self.get_namespace(),
+        )
+
+
+class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
+    """Delete a resource in a kubernetes."""
+
+    def execute(self, context) -> None:
+        print(self.yaml_conf)

Review Comment:
   ```suggestion
   ```



##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict
+from airflow.utils import yaml
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """Abstract base class for all Kubernetes Resource operators."""
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = "kubernetes_default",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self._namespace = namespace
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+        self.yaml_conf = yaml_conf
+
+    @cached_property
+    def client(self) -> ApiClient:
+        return self.hook.api_client
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        hook = KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+        return hook
+
+    def get_namespace(self) -> str:
+        if self._namespace:
+            return self._namespace
+        else:
+            tmp = self.hook.get_namespace()
+            if tmp:
+                return tmp
+            else:
+                return "default"
+
+
+class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
+    """Create a resource in a kubernetes."""
+
+    def execute(self, context) -> None:
+        print(self.yaml_conf)

Review Comment:
   ```suggestion
   ```
   
   Left over debugging?



##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]
+# The DAG object; we'll need this to instantiate a DAG
+from airflow import DAG
+
+# Operators; we need this to operate!
+from airflow.providers.cncf.kubernetes.operators.resource import (
+    KubernetesCreateResourceOperator,
+    KubernetesDeleteResourceOperator,
+)
+
+# [END import_module]
+
+
+# [START instantiate_dag]
+
+pvc_name = "toto"
+
+pvc_conf = f"""
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {pvc_name}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  storageClassName: standard-rwo
+  resources:
+    requests:
+      storage: 500Gi

Review Comment:
   I feel like we should use something way smaller?



##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]
+# The DAG object; we'll need this to instantiate a DAG
+from airflow import DAG
+
+# Operators; we need this to operate!
+from airflow.providers.cncf.kubernetes.operators.resource import (
+    KubernetesCreateResourceOperator,
+    KubernetesDeleteResourceOperator,
+)
+
+# [END import_module]
+
+
+# [START instantiate_dag]
+
+pvc_name = "toto"
+
+pvc_conf = f"""
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {pvc_name}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  storageClassName: standard-rwo

Review Comment:
   Might be more portable to just use the default storage class?



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(

Review Comment:
   ```suggestion
   def delete_from_dict(
       *,
   ```
   
   Let's do kwargs only from the start. Can you add some typing here too?



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    api_exceptions = []
+    if "List" in yml_document["kind"]:
+        kind = yml_document["kind"].replace("List", "")
+        for yml_doc in yml_document["items"]:
+            if kind != "":
+                yml_doc["apiVersion"] = yml_document["apiVersion"]
+                yml_doc["kind"] = kind
+            try:
+                _delete_from_yaml_single_item(
+                    k8s_client,
+                    yml_doc,
+                    verbose,
+                    namespace=namespace,
+                    body=body,
+                    **kwargs,
+                )
+            except client.rest.ApiException as api_exception:
+                api_exceptions.append(api_exception)
+    else:
+
+        try:
+            _delete_from_yaml_single_item(
+                k8s_client,
+                yml_document,
+                verbose,
+                namespace=namespace,
+                body=body,
+                **kwargs,
+            )
+        except client.rest.ApiException as api_exception:
+            api_exceptions.append(api_exception)
+
+    if api_exceptions:
+        raise FailToDeleteError(api_exceptions)
+
+
+def _delete_from_yaml_single_item(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    # get group and version from apiVersion
+    group, _, version = yml_document["apiVersion"].partition("/")
+    if version == "":
+        version = group
+        group = "core"
+    # Take care for the case e.g. api_type is "apiextensions.k8s.io"
+    # Only replace the last instance
+    group = "".join(group.rsplit(".k8s.io", 1))
+    # convert group name from DNS subdomain format to
+    # python class name convention
+    group = "".join(word.capitalize() for word in group.split("."))
+    fcn_to_call = f"{group}{version.capitalize()}Api"
+    k8s_api = getattr(client, fcn_to_call)(k8s_client)
+    # Replace CamelCased action_type into snake_case
+    kind = yml_document["kind"]
+    kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
+    kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
+
+    # Decide which namespace we are going to use for deleting the object
+    # IMPORTANT: Its ignore namespace in args:

Review Comment:
   ```suggestion
       # IMPORTANT: the docs namespace takes precedence over the namespace in args
   ```



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########


Review Comment:
   I feel like we are lacking some test coverage on this path.



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    api_exceptions = []
+    if "List" in yml_document["kind"]:
+        kind = yml_document["kind"].replace("List", "")
+        for yml_doc in yml_document["items"]:
+            if kind != "":
+                yml_doc["apiVersion"] = yml_document["apiVersion"]
+                yml_doc["kind"] = kind
+            try:
+                _delete_from_yaml_single_item(
+                    k8s_client,
+                    yml_doc,
+                    verbose,
+                    namespace=namespace,
+                    body=body,
+                    **kwargs,
+                )
+            except client.rest.ApiException as api_exception:
+                api_exceptions.append(api_exception)
+    else:
+
+        try:
+            _delete_from_yaml_single_item(
+                k8s_client,
+                yml_document,
+                verbose,
+                namespace=namespace,
+                body=body,
+                **kwargs,
+            )
+        except client.rest.ApiException as api_exception:
+            api_exceptions.append(api_exception)
+
+    if api_exceptions:
+        raise FailToDeleteError(api_exceptions)
+
+
+def _delete_from_yaml_single_item(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    # get group and version from apiVersion
+    group, _, version = yml_document["apiVersion"].partition("/")
+    if version == "":
+        version = group
+        group = "core"
+    # Take care for the case e.g. api_type is "apiextensions.k8s.io"
+    # Only replace the last instance
+    group = "".join(group.rsplit(".k8s.io", 1))
+    # convert group name from DNS subdomain format to
+    # python class name convention
+    group = "".join(word.capitalize() for word in group.split("."))
+    fcn_to_call = f"{group}{version.capitalize()}Api"
+    k8s_api = getattr(client, fcn_to_call)(k8s_client)
+    # Replace CamelCased action_type into snake_case
+    kind = yml_document["kind"]
+    kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
+    kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
+
+    # Decide which namespace we are going to use for deleting the object
+    # IMPORTANT: Its ignore namespace in args:
+    #    create_from_yaml_single_item have same behaviour
+    if "namespace" in yml_document["metadata"]:
+        namespace = yml_document["metadata"]["namespace"]
+    name = yml_document["metadata"]["name"]
+
+    # Expect the user to delete namespaced objects more often
+    if hasattr(k8s_api, f"delete_namespaced_{kind}"):
+        resp: client.V1Status = getattr(k8s_api, f"delete_namespaced_{kind}")(
+            name=name, namespace=namespace, body=body, **kwargs
+        )
+    else:
+        resp: client.V1Status = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs)
+    if verbose:
+        print(f"{kind} deleted. status='{str(resp.status)}'")
+    return resp
+
+
+class FailToDeleteError(Exception):
+    """
+    An exception class for handling error if an error occurred when
+    handling a yaml file during deletion of the resource.
+    """
+
+    def __init__(self, api_exceptions):
+        self.api_exceptions = api_exceptions
+
+    def __str__(self):
+        msg = ""
+        for api_exception in self.api_exceptions:
+            msg += f"Error from server ({api_exception.reason}):{api_exception.body}"

Review Comment:
   Won't these all run together?



##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict
+from airflow.utils import yaml
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """Abstract base class for all Kubernetes Resource operators."""
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = "kubernetes_default",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self._namespace = namespace

Review Comment:
   Is there a reason this one is private, but none of the others are?



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    api_exceptions = []
+    if "List" in yml_document["kind"]:
+        kind = yml_document["kind"].replace("List", "")
+        for yml_doc in yml_document["items"]:
+            if kind != "":
+                yml_doc["apiVersion"] = yml_document["apiVersion"]
+                yml_doc["kind"] = kind
+            try:
+                _delete_from_yaml_single_item(
+                    k8s_client,
+                    yml_doc,
+                    verbose,
+                    namespace=namespace,
+                    body=body,
+                    **kwargs,
+                )
+            except client.rest.ApiException as api_exception:
+                api_exceptions.append(api_exception)
+    else:
+
+        try:
+            _delete_from_yaml_single_item(
+                k8s_client,
+                yml_document,
+                verbose,
+                namespace=namespace,
+                body=body,
+                **kwargs,
+            )
+        except client.rest.ApiException as api_exception:
+            api_exceptions.append(api_exception)
+
+    if api_exceptions:
+        raise FailToDeleteError(api_exceptions)
+
+
+def _delete_from_yaml_single_item(

Review Comment:
   Might as well do the same here re kwarsgs only and typing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "jedcunningham (via GitHub)" <gi...@apache.org>.
jedcunningham commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1214652993


##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]
+# The DAG object; we'll need this to instantiate a DAG
+from airflow import DAG
+
+# Operators; we need this to operate!
+from airflow.providers.cncf.kubernetes.operators.resource import (
+    KubernetesCreateResourceOperator,
+    KubernetesDeleteResourceOperator,
+)
+
+# [END import_module]
+
+
+# [START instantiate_dag]
+
+pvc_name = "toto"
+
+pvc_conf = f"""
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {pvc_name}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  storageClassName: standard
+  resources:
+    requests:
+      storage: 5Gi
+"""
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_kubernetes_resource_operator"
+
+
+with DAG(
+    DAG_ID,
+    default_args={"max_active_runs": 1},
+    description="create and delete a PVC in a kubernetes",
+    schedule=timedelta(days=1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    # [START KubernetesCreateResourceOperator_DAG]
+    # [START KubernetesDeleteResourceOperator_DAG]

Review Comment:
   ```suggestion
   ```
   
   Not sure we need these.



##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]
+# The DAG object; we'll need this to instantiate a DAG
+from airflow import DAG
+
+# Operators; we need this to operate!
+from airflow.providers.cncf.kubernetes.operators.resource import (
+    KubernetesCreateResourceOperator,
+    KubernetesDeleteResourceOperator,
+)
+
+# [END import_module]
+
+
+# [START instantiate_dag]
+
+pvc_name = "toto"
+
+pvc_conf = f"""
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: {pvc_name}
+spec:
+  accessModes:
+    - ReadWriteOnce
+  storageClassName: standard
+  resources:
+    requests:
+      storage: 5Gi
+"""
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_kubernetes_resource_operator"
+
+
+with DAG(
+    DAG_ID,
+    default_args={"max_active_runs": 1},
+    description="create and delete a PVC in a kubernetes",
+    schedule=timedelta(days=1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    # [START KubernetesCreateResourceOperator_DAG]
+    # [START KubernetesDeleteResourceOperator_DAG]
+    t1 = KubernetesCreateResourceOperator(
+        task_id="create_pvc",
+        yaml_conf=pvc_conf,
+    )
+
+    t2 = KubernetesDeleteResourceOperator(
+        task_id="delete_pvc",
+        yaml_conf=pvc_conf,
+    )
+
+    t1 >> t2
+    # [END KubernetesDeleteResourceOperator_DAG]
+    # [END KubernetesCreateResourceOperator_DAG]

Review Comment:
   ```suggestion
   ```



##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]
+# The DAG object; we'll need this to instantiate a DAG
+from airflow import DAG
+
+# Operators; we need this to operate!
+from airflow.providers.cncf.kubernetes.operators.resource import (
+    KubernetesCreateResourceOperator,
+    KubernetesDeleteResourceOperator,
+)
+
+# [END import_module]
+
+
+# [START instantiate_dag]
+

Review Comment:
   ```suggestion
   ```
   
   or these.



##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]
+# The DAG object; we'll need this to instantiate a DAG
+from airflow import DAG
+
+# Operators; we need this to operate!
+from airflow.providers.cncf.kubernetes.operators.resource import (

Review Comment:
   ```suggestion
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.operators.resource import (
   ```
   
   nit



##########
tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator.
+In this example, we create two tasks which execute sequentially.
+The first task is to create a PVC on Kubernetes cluster.
+and the second task is to delete the PVC.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+# [START import_module]

Review Comment:
   ```suggestion
   ```
   
   Or this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteOperator & KubernetesCreateOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1475296181

   missing delete_from_yaml : https://github.com/kubernetes-client/python/issues/940 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1163342477


##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict
+from airflow.utils import yaml
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """Abstract base class for all Kubernetes Resource operators."""
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = "kubernetes_default",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self._namespace = namespace

Review Comment:
   yes because there is a getter `def get_namespace(self) -> str:`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1206530768


##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+import yaml
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
+
+__all__ = ["KubernetesCreateResourceOperator", "KubernetesDeleteResourceOperator"]
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """
+    Abstract base class for all Kubernetes Resource operators.
+
+    :param yaml_conf: string. Contains the kubernetes resources to Create or Delete
+    :param namespace: string. Contains the namespace to create all resources inside.
+        The namespace must preexist otherwise the resource creation will fail.
+        If the API object in the yaml file already contains a namespace definition then
+        this parameter has no effect.
+    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
+        for the Kubernetes cluster.
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param cluster_context: context that points to kubernetes cluster.
+        Ignored when in_cluster is True. If None, current-context is used.
+    :param config_file: The path to the Kubernetes config file. (templated)
+        If not specified, default value is ``~/.kube/config``
+    """
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,

Review Comment:
   because they are still present in the KPO , but if the objective is to get rid of it in a near future , then yes I can remove it from this new operator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteOperator & KubernetesCreateOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1475302721

   inspired from https://github.com/tomplus/kubernetes_asyncio/pull/239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1491903916

   @jedcunningham thanks for your first review , could you have a second look to the PR , thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "jedcunningham (via GitHub)" <gi...@apache.org>.
jedcunningham commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1178051639


##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    api_exceptions = []
+    if "List" in yml_document["kind"]:
+        kind = yml_document["kind"].replace("List", "")
+        for yml_doc in yml_document["items"]:
+            if kind != "":
+                yml_doc["apiVersion"] = yml_document["apiVersion"]
+                yml_doc["kind"] = kind
+            try:
+                _delete_from_yaml_single_item(
+                    k8s_client,
+                    yml_doc,
+                    verbose,
+                    namespace=namespace,
+                    body=body,
+                    **kwargs,
+                )
+            except client.rest.ApiException as api_exception:
+                api_exceptions.append(api_exception)
+    else:
+
+        try:
+            _delete_from_yaml_single_item(
+                k8s_client,
+                yml_document,
+                verbose,
+                namespace=namespace,
+                body=body,
+                **kwargs,
+            )
+        except client.rest.ApiException as api_exception:
+            api_exceptions.append(api_exception)
+
+    if api_exceptions:
+        raise FailToDeleteError(api_exceptions)
+
+
+def _delete_from_yaml_single_item(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    # get group and version from apiVersion
+    group, _, version = yml_document["apiVersion"].partition("/")
+    if version == "":
+        version = group
+        group = "core"
+    # Take care for the case e.g. api_type is "apiextensions.k8s.io"
+    # Only replace the last instance
+    group = "".join(group.rsplit(".k8s.io", 1))
+    # convert group name from DNS subdomain format to
+    # python class name convention
+    group = "".join(word.capitalize() for word in group.split("."))
+    fcn_to_call = f"{group}{version.capitalize()}Api"
+    k8s_api = getattr(client, fcn_to_call)(k8s_client)
+    # Replace CamelCased action_type into snake_case
+    kind = yml_document["kind"]
+    kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
+    kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
+
+    # Decide which namespace we are going to use for deleting the object
+    # IMPORTANT: Its ignore namespace in args:
+    #    create_from_yaml_single_item have same behaviour
+    if "namespace" in yml_document["metadata"]:
+        namespace = yml_document["metadata"]["namespace"]
+    name = yml_document["metadata"]["name"]
+
+    # Expect the user to delete namespaced objects more often
+    if hasattr(k8s_api, f"delete_namespaced_{kind}"):
+        resp: client.V1Status = getattr(k8s_api, f"delete_namespaced_{kind}")(
+            name=name, namespace=namespace, body=body, **kwargs
+        )
+    else:
+        resp: client.V1Status = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs)
+    if verbose:
+        print(f"{kind} deleted. status='{str(resp.status)}'")
+    return resp
+
+
+class FailToDeleteError(Exception):
+    """
+    An exception class for handling error if an error occurred when
+    handling a yaml file during deletion of the resource.
+    """
+
+    def __init__(self, api_exceptions):
+        self.api_exceptions = api_exceptions
+
+    def __str__(self):
+        msg = ""
+        for api_exception in self.api_exceptions:
+            msg += f"Error from server ({api_exception.reason}):{api_exception.body}"

Review Comment:
   More specifically, does the exception body have a newline at the end? Or will these just get strung together into a single line with no space/delimiter?



##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########


Review Comment:
   This is still missing coverage. We only have the single document path covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1205654949


##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+import yaml
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
+

Review Comment:
   ```suggestion
   
   __all__ = ["KubernetesCreateResourceOperator", "KubernetesDeleteResourceOperator"]
   ```



##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+import yaml
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """Abstract base class for all Kubernetes Resource operators."""
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,

Review Comment:
   Can you add a docstring for these parameters especially for `namespace`?
   ```
   namespace: string. Contains the namespace to create all
           resources inside. The namespace must preexist otherwise
           the resource creation will fail. If the API object in
           the yaml file already contains a namespace definition
           this parameter has no effect.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1575631041

   @potiuk could we merge ? Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1163344192


##########
airflow/providers/cncf/kubernetes/utils/delete_from.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files
+
+from __future__ import annotations
+
+import re
+
+from kubernetes import client
+
+DEFAULT_DELETION_BODY = client.V1DeleteOptions(
+    propagation_policy="Background",
+    grace_period_seconds=5,
+)
+
+
+def delete_from_dict(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    api_exceptions = []
+    if "List" in yml_document["kind"]:
+        kind = yml_document["kind"].replace("List", "")
+        for yml_doc in yml_document["items"]:
+            if kind != "":
+                yml_doc["apiVersion"] = yml_document["apiVersion"]
+                yml_doc["kind"] = kind
+            try:
+                _delete_from_yaml_single_item(
+                    k8s_client,
+                    yml_doc,
+                    verbose,
+                    namespace=namespace,
+                    body=body,
+                    **kwargs,
+                )
+            except client.rest.ApiException as api_exception:
+                api_exceptions.append(api_exception)
+    else:
+
+        try:
+            _delete_from_yaml_single_item(
+                k8s_client,
+                yml_document,
+                verbose,
+                namespace=namespace,
+                body=body,
+                **kwargs,
+            )
+        except client.rest.ApiException as api_exception:
+            api_exceptions.append(api_exception)
+
+    if api_exceptions:
+        raise FailToDeleteError(api_exceptions)
+
+
+def _delete_from_yaml_single_item(
+    k8s_client,
+    yml_document,
+    verbose=False,
+    namespace="default",
+    body=None,
+    **kwargs,
+):
+    if body is None:
+        body = DEFAULT_DELETION_BODY
+
+    # get group and version from apiVersion
+    group, _, version = yml_document["apiVersion"].partition("/")
+    if version == "":
+        version = group
+        group = "core"
+    # Take care for the case e.g. api_type is "apiextensions.k8s.io"
+    # Only replace the last instance
+    group = "".join(group.rsplit(".k8s.io", 1))
+    # convert group name from DNS subdomain format to
+    # python class name convention
+    group = "".join(word.capitalize() for word in group.split("."))
+    fcn_to_call = f"{group}{version.capitalize()}Api"
+    k8s_api = getattr(client, fcn_to_call)(k8s_client)
+    # Replace CamelCased action_type into snake_case
+    kind = yml_document["kind"]
+    kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
+    kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
+
+    # Decide which namespace we are going to use for deleting the object
+    # IMPORTANT: Its ignore namespace in args:
+    #    create_from_yaml_single_item have same behaviour
+    if "namespace" in yml_document["metadata"]:
+        namespace = yml_document["metadata"]["namespace"]
+    name = yml_document["metadata"]["name"]
+
+    # Expect the user to delete namespaced objects more often
+    if hasattr(k8s_api, f"delete_namespaced_{kind}"):
+        resp: client.V1Status = getattr(k8s_api, f"delete_namespaced_{kind}")(
+            name=name, namespace=namespace, body=body, **kwargs
+        )
+    else:
+        resp: client.V1Status = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs)
+    if verbose:
+        print(f"{kind} deleted. status='{str(resp.status)}'")
+    return resp
+
+
+class FailToDeleteError(Exception):
+    """
+    An exception class for handling error if an error occurred when
+    handling a yaml file during deletion of the resource.
+    """
+
+    def __init__(self, api_exceptions):
+        self.api_exceptions = api_exceptions
+
+    def __str__(self):
+        msg = ""
+        for api_exception in self.api_exceptions:
+            msg += f"Error from server ({api_exception.reason}):{api_exception.body}"

Review Comment:
   if there is a list of multiple resource, each deletion is try.
   
   if an error occur then it will fail at the end and log the error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29930:
URL: https://github.com/apache/airflow/pull/29930#discussion_r1206346858


##########
airflow/providers/cncf/kubernetes/operators/resource.py:
##########
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manage a Kubernetes Resource"""
+
+from __future__ import annotations
+
+import yaml
+from kubernetes.client import ApiClient
+from kubernetes.utils import create_from_yaml
+
+from airflow.compat.functools import cached_property
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
+
+__all__ = ["KubernetesCreateResourceOperator", "KubernetesDeleteResourceOperator"]
+
+
+class KubernetesResourceBaseOperator(BaseOperator):
+    """
+    Abstract base class for all Kubernetes Resource operators.
+
+    :param yaml_conf: string. Contains the kubernetes resources to Create or Delete
+    :param namespace: string. Contains the namespace to create all resources inside.
+        The namespace must preexist otherwise the resource creation will fail.
+        If the API object in the yaml file already contains a namespace definition then
+        this parameter has no effect.
+    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
+        for the Kubernetes cluster.
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param cluster_context: context that points to kubernetes cluster.
+        Ignored when in_cluster is True. If None, current-context is used.
+    :param config_file: The path to the Kubernetes config file. (templated)
+        If not specified, default value is ``~/.kube/config``
+    """
+
+    template_fields = ("yaml_conf",)
+    template_fields_renderers = {"yaml_conf": "yaml"}
+
+    def __init__(
+        self,
+        *,
+        yaml_conf: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,

Review Comment:
   Why do we need this arguments? We can specify them in the connection extras:
   - `in_cluster` extra for in_cluster conf
   - `cluster_context` extra for cluster_context
   - `kube_config_path` extra for config file path
   - even `kube_config` extra for the config file content
   
   IMO it's better to keep a single interface to specify these configuration which is the Kubernetes connection, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1535902709

   @eladkal thank again for your review , I adressed all your remarks , could you please re-review , thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteResourceOperator & KubernetesCreateResourceOperator

Posted by "raphaelauv (via GitHub)" <gi...@apache.org>.
raphaelauv commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1519612524

   flaky test related to https://github.com/apache/airflow/pull/30831


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #29930: KubernetesResourceOperator - create PVC & delete PVC

Posted by "jedcunningham (via GitHub)" <gi...@apache.org>.
jedcunningham commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1456562276

   I'm hesitant to build create/deletes for every single resource type... Especially if you are going to ask them to give you it all via yaml anyways.
   
   Check out [create_from_yaml](https://github.com/kubernetes-client/python/blob/617e5f520fc30d954a2465cd82bbdcfd4635472d/kubernetes/utils/create_from_yaml.py#L97) instead. We could probably use this to build a generic create/delete?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29930: KubernetesResourceOperator - KubernetesDeleteOperator & KubernetesCreateOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29930:
URL: https://github.com/apache/airflow/pull/29930#issuecomment-1477937680

   errors to fix :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org