You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "VladaZakharova (via GitHub)" <gi...@apache.org> on 2023/02/01 12:55:12 UTC

[GitHub] [airflow] VladaZakharova opened a new pull request, #29266: Add deferrable mode to GKEStartPodOperator

VladaZakharova opened a new pull request, #29266:
URL: https://github.com/apache/airflow/pull/29266

   Add deferrable mode to GKEStartPodOperator
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143035484


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -452,89 +473,69 @@ def __init__(
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")
 
-    @staticmethod
-    @contextmanager
-    def get_gke_config_file(

Review Comment:
   I believe operators/hooks public methods need to be backward compatible so not sure if we can remove it directly 



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -452,89 +473,69 @@ def __init__(
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")
 
-    @staticmethod
-    @contextmanager
-    def get_gke_config_file(
-        gcp_conn_id,
-        project_id: str | None,
-        cluster_name: str,
-        impersonation_chain: str | Sequence[str] | None,
-        regional: bool,
-        location: str,
-        use_internal_ip: bool,
-    ) -> Generator[str, None, None]:
-
-        hook = GoogleBaseHook(gcp_conn_id=gcp_conn_id)
-        project_id = project_id or hook.project_id
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
 
-        if not project_id:
-            raise AirflowException(
-                "The project id must be passed either as "
-                "keyword project_id parameter or as project_id extra "
-                "in Google Cloud connection definition. Both are not set!"
+    @cached_property
+    def hook(self) -> GKEPodHook:  # type: ignore[override]
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
             )
 
-        # Write config to a temp file and set the environment variable to point to it.
-        # This is to avoid race conditions of reading/writing a single file
-        with tempfile.NamedTemporaryFile() as conf_file, patch_environ(
-            {KUBE_CONFIG_ENV_VAR: conf_file.name}
-        ), hook.provide_authorized_gcloud():
-            # Attempt to get/update credentials
-            # We call gcloud directly instead of using google-cloud-python api
-            # because there is no way to write kubernetes config to a file, which is
-            # required by KubernetesPodOperator.
-            # The gcloud command looks at the env variable `KUBECONFIG` for where to save
-            # the kubernetes config file.
-            cmd = [
-                "gcloud",
-                "container",
-                "clusters",
-                "get-credentials",
-                cluster_name,
-                "--project",
-                project_id,
-            ]
-            if impersonation_chain:
-                if isinstance(impersonation_chain, str):
-                    impersonation_account = impersonation_chain
-                elif len(impersonation_chain) == 1:
-                    impersonation_account = impersonation_chain[0]
-                else:
-                    raise AirflowException(
-                        "Chained list of accounts is not supported, please specify only one service account"
-                    )
-
-                cmd.extend(
-                    [
-                        "--impersonate-service-account",
-                        impersonation_account,
-                    ]
-                )
-            if regional:
-                cmd.append("--region")
-            else:
-                cmd.append("--zone")
-            cmd.append(location)
-            if use_internal_ip:
-                cmd.append("--internal-ip")
-            execute_in_subprocess(cmd)
-
-            # Tell `KubernetesPodOperator` where the config file is located
-            yield os.environ[KUBE_CONFIG_ENV_VAR]
+        hook = GKEPodHook(
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
 
-    def execute(self, context: Context) -> str | None:
+    def execute(self, context: Context):
+        """Executes process of creating pod and executing provided command inside it."""
+        self.fetch_cluster_info()
+        return super().execute(context)
 
-        with GKEStartPodOperator.get_gke_config_file(
-            gcp_conn_id=self.gcp_conn_id,
+    def fetch_cluster_info(self) -> tuple[str, str | None]:
+        """Fetches cluster info for connecting to it."""
+        cluster = self.cluster_hook.get_cluster(

Review Comment:
   wondering if it make sense to use async `get_cluster(...)` 
   https://github.com/googleapis/python-container/blob/main/google/cloud/container_v1/services/cluster_manager/async_client.py#L359 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143059244


##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -336,3 +343,195 @@ async def get_operation(
         return await client.get_operation(
             name=operation_path,
         )
+
+
+class GKEPodHook(GoogleBaseHook):
+    """Hook for managing Google Kubernetes Engine pod APIs."""
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+    @cached_property
+    def api_client(self) -> client.ApiClient:
+        return self.get_conn()
+
+    @cached_property
+    def core_v1_client(self) -> client.CoreV1Api:
+        return client.CoreV1Api(self.api_client)
+
+    @property
+    def is_in_cluster(self) -> bool:
+        return False
+
+    @staticmethod
+    def get_xcom_sidecar_container_image():
+        """Returns the xcom sidecar image that defined in the connection"""
+        return PodDefaults.SIDECAR_CONTAINER.image
+
+    def get_conn(self) -> client.ApiClient:
+        configuration = self._get_config()
+        return client.ApiClient(configuration)
+
+    def _get_config(self) -> client.configuration.Configuration:
+        configuration = client.Configuration(
+            host=self._cluster_url,
+            api_key_prefix={"authorization": "Bearer"},
+            api_key={"authorization": self._get_token(self.get_credentials())},
+        )
+        configuration.ssl_ca_cert = FileOrData(
+            {
+                "certificate-authority-data": self._ssl_ca_cert,
+            },
+            file_key_name="certificate-authority",
+        ).as_file()
+        return configuration
+
+    @staticmethod
+    def _get_token(creds: google.auth.credentials.Credentials) -> str:
+        if creds.token is None or creds.expired:
+            auth_req = google_requests.Request()
+            creds.refresh(auth_req)
+        return creds.token
+
+    def get_pod(self, name: str, namespace: str) -> V1Pod:
+        """
+        Gets pod's object.
+
+        :param name: Name of the pod.
+        :param namespace: Name of the pod's namespace.
+        """
+        return self.core_v1_client.read_namespaced_pod(
+            name=name,
+            namespace=namespace,
+        )
+
+
+class AsyncGKEPodHook(GoogleBaseAsyncHook):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1418632790

   Hi @kaxil !
   Is there something else that i can improve here in the code? 
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1470999113

   I have not followed too much the deferrable operators - I know it was somewhere in between Astronomer and Goole work :) and you worked, so I guess It would take more time for me to get full context. But maybe @kaxil and @phanikumv could chime-in here? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1477424457

   @kaxil 
   Hi! Can you please review my changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1463572607

   @kaxil
   Please, check the changes so i can merge them to main


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1118468868


##########
RELEASE_NOTES.rst:
##########
@@ -8625,7 +8625,7 @@ Bug fixes
 - [AIRFLOW-2645][AIRFLOW-2617] Add worker_container_image_pull_policy
 - [AIRFLOW-2661] fix config dags_volume_subpath and logs_volume_subpath
 - [AIRFLOW-3550] Standardize GKE hook (#4364)
-- [AIRFLOW-2863] Fix GKEClusterHook catching wrong exception (#3711)
+- [AIRFLOW-2863] Fix GKEHook catching wrong exception (#3711)

Review Comment:
   I have removed these changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143126084


##########
tests/providers/google/cloud/hooks/test_kubernetes_engine.py:
##########
@@ -291,6 +296,72 @@ def test_wait_for_response_running(self, time_mock, operation_mock):
         assert operation_mock.call_count == 2
 
 
+class TestAsyncGKEPodHook:

Review Comment:
   Done



##########
tests/providers/google/cloud/hooks/test_kubernetes_engine.py:
##########
@@ -20,12 +20,13 @@
 import sys
 from asyncio import Future
 
+import kubernetes.client
 import pytest
 from google.cloud.container_v1 import ClusterManagerAsyncClient
 from google.cloud.container_v1.types import Cluster
 
 from airflow.exceptions import AirflowException
-from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook, GKEHook
+from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook, AsyncGKEPodHook, GKEHook

Review Comment:
   Done



##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -148,6 +148,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1457983272

   @kaxil @potiuk 
   Hi! Can you please my PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143056466


##########
airflow/providers/google/cloud/triggers/kubernetes_engine.py:
##########
@@ -18,14 +18,108 @@
 from __future__ import annotations
 
 import asyncio
+from datetime import datetime
 from typing import Any, AsyncIterator, Sequence
 
 from google.cloud.container_v1.types import Operation
 
-from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook
+try:
+    from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+except ImportError:
+    # preserve backward compatibility for older versions of cncf.kubernetes provider
+    from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import KubernetesPodTrigger
+from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook, AsyncGKEPodHook
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 
+class GKEPodTrigger(KubernetesPodTrigger):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143059727


##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -336,3 +343,195 @@ async def get_operation(
         return await client.get_operation(
             name=operation_path,
         )
+
+
+class GKEPodHook(GoogleBaseHook):
+    """Hook for managing Google Kubernetes Engine pod APIs."""
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+    @cached_property
+    def api_client(self) -> client.ApiClient:
+        return self.get_conn()
+
+    @cached_property
+    def core_v1_client(self) -> client.CoreV1Api:
+        return client.CoreV1Api(self.api_client)
+
+    @property
+    def is_in_cluster(self) -> bool:
+        return False
+
+    @staticmethod
+    def get_xcom_sidecar_container_image():
+        """Returns the xcom sidecar image that defined in the connection"""
+        return PodDefaults.SIDECAR_CONTAINER.image
+
+    def get_conn(self) -> client.ApiClient:
+        configuration = self._get_config()
+        return client.ApiClient(configuration)
+
+    def _get_config(self) -> client.configuration.Configuration:
+        configuration = client.Configuration(
+            host=self._cluster_url,
+            api_key_prefix={"authorization": "Bearer"},
+            api_key={"authorization": self._get_token(self.get_credentials())},
+        )
+        configuration.ssl_ca_cert = FileOrData(
+            {
+                "certificate-authority-data": self._ssl_ca_cert,
+            },
+            file_key_name="certificate-authority",
+        ).as_file()
+        return configuration
+
+    @staticmethod
+    def _get_token(creds: google.auth.credentials.Credentials) -> str:
+        if creds.token is None or creds.expired:
+            auth_req = google_requests.Request()
+            creds.refresh(auth_req)
+        return creds.token
+
+    def get_pod(self, name: str, namespace: str) -> V1Pod:
+        """
+        Gets pod's object.
+
+        :param name: Name of the pod.
+        :param namespace: Name of the pod's namespace.
+        """
+        return self.core_v1_client.read_namespaced_pod(
+            name=name,
+            namespace=namespace,
+        )
+
+
+class AsyncGKEPodHook(GoogleBaseAsyncHook):
+    """
+    Hook for managing Google Kubernetes Engine pods APIs in asynchronous way.
+
+    :param cluster_url: The URL pointed to the cluster.
+    :param ssl_ca_cert: SSL certificate that is used for authentication to the pod.
+    """
+
+    sync_hook_class = GKEPodHook
+    scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        **kwargs,
+    ):
+
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+        kwargs.update(
+            cluster_url=cluster_url,
+            ssl_ca_cert=ssl_ca_cert,
+        )
+        super().__init__(**kwargs)
+
+    @contextlib.asynccontextmanager
+    async def get_conn(self, token: Token) -> async_client.ApiClient:  # type: ignore[override]

Review Comment:
   This was done to pass pre-commit check, since signature of this method is not the same as in method in super class BaseHook



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1152406606


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -452,89 +473,69 @@ def __init__(
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")
 
-    @staticmethod
-    @contextmanager
-    def get_gke_config_file(

Review Comment:
   I think good to have a warning but let's see what others think...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143080692


##########
tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py:
##########
@@ -68,9 +68,11 @@
         in_cluster=False,
         is_delete_operator_pod=True,
         get_logs=True,
+        deferrable=True,
     )
 
-    pod_task_xcom = GKEStartPodOperator(
+    # [START howto_operator_gke_start_pod_xcom_async]
+    pod_task_xcom_async = GKEStartPodOperator(
         task_id="pod_task_xcom",

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1105839138


##########
RELEASE_NOTES.rst:
##########
@@ -8625,7 +8625,7 @@ Bug fixes
 - [AIRFLOW-2645][AIRFLOW-2617] Add worker_container_image_pull_policy
 - [AIRFLOW-2661] fix config dags_volume_subpath and logs_volume_subpath
 - [AIRFLOW-3550] Standardize GKE hook (#4364)
-- [AIRFLOW-2863] Fix GKEClusterHook catching wrong exception (#3711)
+- [AIRFLOW-2863] Fix GKEHook catching wrong exception (#3711)

Review Comment:
   can you revert the unrelated change please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1469633214

   @potiuk 
   Hi! Could you please check these changes? My team is really waiting for these changes :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1105841018


##########
airflow/contrib/hooks/__init__.py:
##########
@@ -154,7 +154,6 @@
     },
     'gcp_container_hook': {
         'GKEHook': 'airflow.providers.google.cloud.hooks.kubernetes_engine.GKEHook',
-        'GKEClusterHook': 'airflow.providers.google.cloud.hooks.kubernetes_engine.GKEHook',

Review Comment:
   why was this entry removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1118468467


##########
airflow/contrib/hooks/__init__.py:
##########
@@ -154,7 +154,6 @@
     },
     'gcp_container_hook': {
         'GKEHook': 'airflow.providers.google.cloud.hooks.kubernetes_engine.GKEHook',
-        'GKEClusterHook': 'airflow.providers.google.cloud.hooks.kubernetes_engine.GKEHook',

Review Comment:
   My fault :)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1477428263

   @pankajastro  and I are looking into this changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1093120537


##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -25,29 +25,44 @@
 """
 from __future__ import annotations
 
+import contextlib
 import json
 import time
 import warnings
 from typing import Sequence
 
+import google.auth.credentials
+from gcloud.aio.auth import Token
 from google.api_core.exceptions import AlreadyExists, NotFound
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry
+from google.auth.transport import requests as google_requests
 
 # not sure why but mypy complains on missing `container_v1` but it is clearly there and is importable
 from google.cloud import container_v1, exceptions  # type: ignore[attr-defined]
 from google.cloud.container_v1 import ClusterManagerClient
 from google.cloud.container_v1.types import Cluster, Operation
+from kubernetes import client
+from kubernetes_asyncio import client as async_client
+from kubernetes_asyncio.client.models import V1Pod
+from kubernetes_asyncio.config.kube_config import FileOrData
+from urllib3.exceptions import HTTPError
 
 from airflow import version
+from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
+from airflow.kubernetes.pod_generator_deprecated import PodDefaults
 from airflow.providers.google.common.consts import CLIENT_INFO
-from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook
+from airflow.providers.google.common.hooks.base_google import (
+    PROVIDE_PROJECT_ID,
+    GoogleBaseAsyncHook,
+    GoogleBaseHook,
+)
 
 OPERATIONAL_POLL_INTERVAL = 15
 
 
-class GKEHook(GoogleBaseHook):
+class GKEClusterHook(GoogleBaseHook):

Review Comment:
   Can you add backwards-compat if possible please? Users might be using this `Hook`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1093161447


##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -25,29 +25,44 @@
 """
 from __future__ import annotations
 
+import contextlib
 import json
 import time
 import warnings
 from typing import Sequence
 
+import google.auth.credentials
+from gcloud.aio.auth import Token
 from google.api_core.exceptions import AlreadyExists, NotFound
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry
+from google.auth.transport import requests as google_requests
 
 # not sure why but mypy complains on missing `container_v1` but it is clearly there and is importable
 from google.cloud import container_v1, exceptions  # type: ignore[attr-defined]
 from google.cloud.container_v1 import ClusterManagerClient
 from google.cloud.container_v1.types import Cluster, Operation
+from kubernetes import client
+from kubernetes_asyncio import client as async_client
+from kubernetes_asyncio.client.models import V1Pod
+from kubernetes_asyncio.config.kube_config import FileOrData
+from urllib3.exceptions import HTTPError
 
 from airflow import version
+from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
+from airflow.kubernetes.pod_generator_deprecated import PodDefaults
 from airflow.providers.google.common.consts import CLIENT_INFO
-from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook
+from airflow.providers.google.common.hooks.base_google import (
+    PROVIDE_PROJECT_ID,
+    GoogleBaseAsyncHook,
+    GoogleBaseHook,
+)
 
 OPERATIONAL_POLL_INTERVAL = 15
 
 
-class GKEHook(GoogleBaseHook):
+class GKEClusterHook(GoogleBaseHook):

Review Comment:
   Sure, good idea!
   The name was changed to suite hooks' purpose: there are 2 different classes for separate hooks for cluster and pod management. I will revert the name change and leave a descriptive comment below. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1410585964

   @potiuk @kaxil 
   Hi!
   Can you please check the changes for the GKEStartPod operator, please? This is the second part of the changes from the original PR. Many thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #29266:
URL: https://github.com/apache/airflow/pull/29266#issuecomment-1500011338

   @pankajastro Could you please review my changes once again? Thanks! :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143102891


##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -148,6 +148,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   Please also add how running this pod in deferrable mode would benefit the user



##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -148,6 +148,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   Please also add how running this in deferrable mode would benefit the user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143106419


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -452,89 +473,69 @@ def __init__(
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")
 
-    @staticmethod
-    @contextmanager
-    def get_gke_config_file(

Review Comment:
   The method was changed to use another way of connection: instead of direct creating gcloud command to perform any action, now the ssl certificate and url of the cluster are used to connect to specific cluster. 
   Can i add some `deprecation warning` for this method instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143097143


##########
tests/providers/google/cloud/hooks/test_kubernetes_engine.py:
##########
@@ -20,12 +20,13 @@
 import sys
 from asyncio import Future
 
+import kubernetes.client
 import pytest
 from google.cloud.container_v1 import ClusterManagerAsyncClient
 from google.cloud.container_v1.types import Cluster
 
 from airflow.exceptions import AirflowException
-from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook, GKEHook
+from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook, AsyncGKEPodHook, GKEHook

Review Comment:
   ```suggestion
   from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEAsyncHook, GKEPodAsyncHook, GKEHook
   ```
   naming should be inline with other async hooks across the repo



##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -148,6 +148,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   ```suggestion
   You can set the parameter deferrable = True in order to run the operator asynchronously:
   ```



##########
tests/providers/google/cloud/hooks/test_kubernetes_engine.py:
##########
@@ -291,6 +296,72 @@ def test_wait_for_response_running(self, time_mock, operation_mock):
         assert operation_mock.call_count == 2
 
 
+class TestAsyncGKEPodHook:

Review Comment:
   ```suggestion
   class TestGKEPodAsyncHook:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #29266: Add deferrable mode to GKEStartPodOperator
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143103080


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -452,89 +473,69 @@ def __init__(
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")
 
-    @staticmethod
-    @contextmanager
-    def get_gke_config_file(
-        gcp_conn_id,
-        project_id: str | None,
-        cluster_name: str,
-        impersonation_chain: str | Sequence[str] | None,
-        regional: bool,
-        location: str,
-        use_internal_ip: bool,
-    ) -> Generator[str, None, None]:
-
-        hook = GoogleBaseHook(gcp_conn_id=gcp_conn_id)
-        project_id = project_id or hook.project_id
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
 
-        if not project_id:
-            raise AirflowException(
-                "The project id must be passed either as "
-                "keyword project_id parameter or as project_id extra "
-                "in Google Cloud connection definition. Both are not set!"
+    @cached_property
+    def hook(self) -> GKEPodHook:  # type: ignore[override]
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
             )
 
-        # Write config to a temp file and set the environment variable to point to it.
-        # This is to avoid race conditions of reading/writing a single file
-        with tempfile.NamedTemporaryFile() as conf_file, patch_environ(
-            {KUBE_CONFIG_ENV_VAR: conf_file.name}
-        ), hook.provide_authorized_gcloud():
-            # Attempt to get/update credentials
-            # We call gcloud directly instead of using google-cloud-python api
-            # because there is no way to write kubernetes config to a file, which is
-            # required by KubernetesPodOperator.
-            # The gcloud command looks at the env variable `KUBECONFIG` for where to save
-            # the kubernetes config file.
-            cmd = [
-                "gcloud",
-                "container",
-                "clusters",
-                "get-credentials",
-                cluster_name,
-                "--project",
-                project_id,
-            ]
-            if impersonation_chain:
-                if isinstance(impersonation_chain, str):
-                    impersonation_account = impersonation_chain
-                elif len(impersonation_chain) == 1:
-                    impersonation_account = impersonation_chain[0]
-                else:
-                    raise AirflowException(
-                        "Chained list of accounts is not supported, please specify only one service account"
-                    )
-
-                cmd.extend(
-                    [
-                        "--impersonate-service-account",
-                        impersonation_account,
-                    ]
-                )
-            if regional:
-                cmd.append("--region")
-            else:
-                cmd.append("--zone")
-            cmd.append(location)
-            if use_internal_ip:
-                cmd.append("--internal-ip")
-            execute_in_subprocess(cmd)
-
-            # Tell `KubernetesPodOperator` where the config file is located
-            yield os.environ[KUBE_CONFIG_ENV_VAR]
+        hook = GKEPodHook(
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
 
-    def execute(self, context: Context) -> str | None:
+    def execute(self, context: Context):
+        """Executes process of creating pod and executing provided command inside it."""
+        self.fetch_cluster_info()
+        return super().execute(context)
 
-        with GKEStartPodOperator.get_gke_config_file(
-            gcp_conn_id=self.gcp_conn_id,
+    def fetch_cluster_info(self) -> tuple[str, str | None]:
+        """Fetches cluster info for connecting to it."""
+        cluster = self.cluster_hook.get_cluster(

Review Comment:
   There is no big need to implement here async call to get cluster since this call only used for getting endpoint to the cluster, and not perform any other long-running action on it. the async get_cluster() method is called in the trigger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143059727


##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -336,3 +343,195 @@ async def get_operation(
         return await client.get_operation(
             name=operation_path,
         )
+
+
+class GKEPodHook(GoogleBaseHook):
+    """Hook for managing Google Kubernetes Engine pod APIs."""
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+    @cached_property
+    def api_client(self) -> client.ApiClient:
+        return self.get_conn()
+
+    @cached_property
+    def core_v1_client(self) -> client.CoreV1Api:
+        return client.CoreV1Api(self.api_client)
+
+    @property
+    def is_in_cluster(self) -> bool:
+        return False
+
+    @staticmethod
+    def get_xcom_sidecar_container_image():
+        """Returns the xcom sidecar image that defined in the connection"""
+        return PodDefaults.SIDECAR_CONTAINER.image
+
+    def get_conn(self) -> client.ApiClient:
+        configuration = self._get_config()
+        return client.ApiClient(configuration)
+
+    def _get_config(self) -> client.configuration.Configuration:
+        configuration = client.Configuration(
+            host=self._cluster_url,
+            api_key_prefix={"authorization": "Bearer"},
+            api_key={"authorization": self._get_token(self.get_credentials())},
+        )
+        configuration.ssl_ca_cert = FileOrData(
+            {
+                "certificate-authority-data": self._ssl_ca_cert,
+            },
+            file_key_name="certificate-authority",
+        ).as_file()
+        return configuration
+
+    @staticmethod
+    def _get_token(creds: google.auth.credentials.Credentials) -> str:
+        if creds.token is None or creds.expired:
+            auth_req = google_requests.Request()
+            creds.refresh(auth_req)
+        return creds.token
+
+    def get_pod(self, name: str, namespace: str) -> V1Pod:
+        """
+        Gets pod's object.
+
+        :param name: Name of the pod.
+        :param namespace: Name of the pod's namespace.
+        """
+        return self.core_v1_client.read_namespaced_pod(
+            name=name,
+            namespace=namespace,
+        )
+
+
+class AsyncGKEPodHook(GoogleBaseAsyncHook):
+    """
+    Hook for managing Google Kubernetes Engine pods APIs in asynchronous way.
+
+    :param cluster_url: The URL pointed to the cluster.
+    :param ssl_ca_cert: SSL certificate that is used for authentication to the pod.
+    """
+
+    sync_hook_class = GKEPodHook
+    scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        **kwargs,
+    ):
+
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+        kwargs.update(
+            cluster_url=cluster_url,
+            ssl_ca_cert=ssl_ca_cert,
+        )
+        super().__init__(**kwargs)
+
+    @contextlib.asynccontextmanager
+    async def get_conn(self, token: Token) -> async_client.ApiClient:  # type: ignore[override]

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143025827


##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -336,3 +343,195 @@ async def get_operation(
         return await client.get_operation(
             name=operation_path,
         )
+
+
+class GKEPodHook(GoogleBaseHook):
+    """Hook for managing Google Kubernetes Engine pod APIs."""
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+    @cached_property
+    def api_client(self) -> client.ApiClient:
+        return self.get_conn()
+
+    @cached_property
+    def core_v1_client(self) -> client.CoreV1Api:
+        return client.CoreV1Api(self.api_client)
+
+    @property
+    def is_in_cluster(self) -> bool:
+        return False
+
+    @staticmethod
+    def get_xcom_sidecar_container_image():
+        """Returns the xcom sidecar image that defined in the connection"""
+        return PodDefaults.SIDECAR_CONTAINER.image
+
+    def get_conn(self) -> client.ApiClient:
+        configuration = self._get_config()
+        return client.ApiClient(configuration)
+
+    def _get_config(self) -> client.configuration.Configuration:
+        configuration = client.Configuration(
+            host=self._cluster_url,
+            api_key_prefix={"authorization": "Bearer"},
+            api_key={"authorization": self._get_token(self.get_credentials())},
+        )
+        configuration.ssl_ca_cert = FileOrData(
+            {
+                "certificate-authority-data": self._ssl_ca_cert,
+            },
+            file_key_name="certificate-authority",
+        ).as_file()
+        return configuration
+
+    @staticmethod
+    def _get_token(creds: google.auth.credentials.Credentials) -> str:
+        if creds.token is None or creds.expired:
+            auth_req = google_requests.Request()
+            creds.refresh(auth_req)
+        return creds.token
+
+    def get_pod(self, name: str, namespace: str) -> V1Pod:
+        """
+        Gets pod's object.
+
+        :param name: Name of the pod.
+        :param namespace: Name of the pod's namespace.
+        """
+        return self.core_v1_client.read_namespaced_pod(
+            name=name,
+            namespace=namespace,
+        )
+
+
+class AsyncGKEPodHook(GoogleBaseAsyncHook):

Review Comment:
   ```suggestion
   class GKEPodAsyncHook(GoogleBaseAsyncHook):
   ```
   just to align with how Async classes are being named



##########
airflow/providers/google/cloud/triggers/kubernetes_engine.py:
##########
@@ -18,14 +18,108 @@
 from __future__ import annotations
 
 import asyncio
+from datetime import datetime
 from typing import Any, AsyncIterator, Sequence
 
 from google.cloud.container_v1.types import Operation
 
-from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook
+try:
+    from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+except ImportError:
+    # preserve backward compatibility for older versions of cncf.kubernetes provider
+    from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import KubernetesPodTrigger
+from airflow.providers.google.cloud.hooks.kubernetes_engine import AsyncGKEHook, AsyncGKEPodHook
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 
+class GKEPodTrigger(KubernetesPodTrigger):

Review Comment:
   shouldnt this be `GKEStartPodTrigger` instead of `GKEPodTrigger` - its being used only by `GKEStartPodOperator`.



##########
airflow/providers/google/cloud/hooks/kubernetes_engine.py:
##########
@@ -336,3 +343,195 @@ async def get_operation(
         return await client.get_operation(
             name=operation_path,
         )
+
+
+class GKEPodHook(GoogleBaseHook):
+    """Hook for managing Google Kubernetes Engine pod APIs."""
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+    @cached_property
+    def api_client(self) -> client.ApiClient:
+        return self.get_conn()
+
+    @cached_property
+    def core_v1_client(self) -> client.CoreV1Api:
+        return client.CoreV1Api(self.api_client)
+
+    @property
+    def is_in_cluster(self) -> bool:
+        return False
+
+    @staticmethod
+    def get_xcom_sidecar_container_image():
+        """Returns the xcom sidecar image that defined in the connection"""
+        return PodDefaults.SIDECAR_CONTAINER.image
+
+    def get_conn(self) -> client.ApiClient:
+        configuration = self._get_config()
+        return client.ApiClient(configuration)
+
+    def _get_config(self) -> client.configuration.Configuration:
+        configuration = client.Configuration(
+            host=self._cluster_url,
+            api_key_prefix={"authorization": "Bearer"},
+            api_key={"authorization": self._get_token(self.get_credentials())},
+        )
+        configuration.ssl_ca_cert = FileOrData(
+            {
+                "certificate-authority-data": self._ssl_ca_cert,
+            },
+            file_key_name="certificate-authority",
+        ).as_file()
+        return configuration
+
+    @staticmethod
+    def _get_token(creds: google.auth.credentials.Credentials) -> str:
+        if creds.token is None or creds.expired:
+            auth_req = google_requests.Request()
+            creds.refresh(auth_req)
+        return creds.token
+
+    def get_pod(self, name: str, namespace: str) -> V1Pod:
+        """
+        Gets pod's object.
+
+        :param name: Name of the pod.
+        :param namespace: Name of the pod's namespace.
+        """
+        return self.core_v1_client.read_namespaced_pod(
+            name=name,
+            namespace=namespace,
+        )
+
+
+class AsyncGKEPodHook(GoogleBaseAsyncHook):
+    """
+    Hook for managing Google Kubernetes Engine pods APIs in asynchronous way.
+
+    :param cluster_url: The URL pointed to the cluster.
+    :param ssl_ca_cert: SSL certificate that is used for authentication to the pod.
+    """
+
+    sync_hook_class = GKEPodHook
+    scopes = ["https://www.googleapis.com/auth/cloud-platform"]
+
+    def __init__(
+        self,
+        cluster_url: str,
+        ssl_ca_cert: str,
+        **kwargs,
+    ):
+
+        self._cluster_url = cluster_url
+        self._ssl_ca_cert = ssl_ca_cert
+
+        kwargs.update(
+            cluster_url=cluster_url,
+            ssl_ca_cert=ssl_ca_cert,
+        )
+        super().__init__(**kwargs)
+
+    @contextlib.asynccontextmanager
+    async def get_conn(self, token: Token) -> async_client.ApiClient:  # type: ignore[override]

Review Comment:
   why did we add `# type: ignore[override]` here



##########
tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py:
##########
@@ -68,9 +68,11 @@
         in_cluster=False,
         is_delete_operator_pod=True,
         get_logs=True,
+        deferrable=True,
     )
 
-    pod_task_xcom = GKEStartPodOperator(
+    # [START howto_operator_gke_start_pod_xcom_async]
+    pod_task_xcom_async = GKEStartPodOperator(
         task_id="pod_task_xcom",

Review Comment:
   ```suggestion
           task_id="pod_task_xcom_async",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #29266: Add deferrable mode to GKEStartPodOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #29266:
URL: https://github.com/apache/airflow/pull/29266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org