You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "VladaZakharova (via GitHub)" <gi...@apache.org> on 2024/02/21 17:21:51 UTC

[PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

VladaZakharova opened a new pull request, #37598:
URL: https://github.com/apache/airflow/pull/37598

   This PR adds new operators to cover Describe and List Jobs functionality.
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499136296


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
+        KubernetesEngineJobLink.persist(context=context, task_instance=self)
+        return None
+
+
+class GKEListJobsOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve list of Jobs.
+
+    If namespace parameter is specified, the list of Jobs from dedicated
+    namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEListJobsOperator`
+
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineWorkloadsLink(),)
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        cluster_name: str,
+        namespace: str | None = None,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        if self.namespace:
+            jobs = self.hook.list_jobs_from_namespace(namespace=self.namespace)
+        else:
+            jobs = self.hook.list_jobs_all_namespaces()
+        for job in jobs.items:
+            self.log.info("Retrieved description of Job:\n %s", job)
+        KubernetesEngineWorkloadsLink.persist(context=context, task_instance=self)
+        return None

Review Comment:
   yes, indeed, thank you, i will change the code :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499090215


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -502,6 +503,30 @@ def create_job(
             raise e
         return resp
 
+    def get_job(self, job_name: str, namespace: str) -> V1Job:
+        """Get Job of specified name from Google Cloud.
+
+        :param job_name: Name of Job to fetch.
+        :param namespace: Namespace of the Job.
+        :return: Job object
+        """
+        return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True)
+
+    def list_jobs_all_namespaces(self) -> V1JobList:
+        """Get list of Jobs from all namespaces.
+
+        :return: V1JobList object
+        """
+        return self.batch_v1_client.list_job_for_all_namespaces(pretty=True)
+
+    def list_jobs_from_namespace(self, namespace: str) -> V1JobList:
+        """Get list of Jobs from dedicated namespace.
+
+        :param namespace: Namespace of the Job.
+        :return: V1JobList object
+        """
+        return self.batch_v1_client.list_namespaced_job(namespace=namespace, pretty=True)
+

Review Comment:
   yes, sounds like a good idea :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499134464


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )

Review Comment:
   yes, you are right, i will remove this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499136910


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)

Review Comment:
   yes, indeed, thank you, i will change the code :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499135451


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
+        KubernetesEngineJobLink.persist(context=context, task_instance=self)
+        return None
+
+
+class GKEListJobsOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve list of Jobs.
+
+    If namespace parameter is specified, the list of Jobs from dedicated
+    namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEListJobsOperator`
+
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineWorkloadsLink(),)
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        cluster_name: str,
+        namespace: str | None = None,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )

Review Comment:
   good idea, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova closed pull request #37598: Add GKEListJobsOperator and GKEDescribeJobOperator
URL: https://github.com/apache/airflow/pull/37598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1498150187


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -502,6 +503,30 @@ def create_job(
             raise e
         return resp
 
+    def get_job(self, job_name: str, namespace: str) -> V1Job:
+        """Get Job of specified name from Google Cloud.
+
+        :param job_name: Name of Job to fetch.
+        :param namespace: Namespace of the Job.
+        :return: Job object
+        """
+        return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True)
+
+    def list_jobs_all_namespaces(self) -> V1JobList:
+        """Get list of Jobs from all namespaces.
+
+        :return: V1JobList object
+        """
+        return self.batch_v1_client.list_job_for_all_namespaces(pretty=True)
+
+    def list_jobs_from_namespace(self, namespace: str) -> V1JobList:
+        """Get list of Jobs from dedicated namespace.
+
+        :param namespace: Namespace of the Job.
+        :return: V1JobList object
+        """
+        return self.batch_v1_client.list_namespaced_job(namespace=namespace, pretty=True)
+

Review Comment:
   At some point, we might need to split the hook in `KubernetesPodHook` and `KubernetesJobHook`, but it's fine for now.



##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -502,6 +503,30 @@ def create_job(
             raise e
         return resp
 
+    def get_job(self, job_name: str, namespace: str) -> V1Job:
+        """Get Job of specified name from Google Cloud.
+
+        :param job_name: Name of Job to fetch.
+        :param namespace: Namespace of the Job.
+        :return: Job object
+        """
+        return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True)

Review Comment:
   `pretty` will print something, no?



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)

Review Comment:
   You can delete this two xcom_push and return both values as a dict or as a single string `f"{self.job.metadata.namespace}.{self.job.metadata.namespace}"`



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
+        KubernetesEngineJobLink.persist(context=context, task_instance=self)
+        return None
+
+
+class GKEListJobsOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve list of Jobs.
+
+    If namespace parameter is specified, the list of Jobs from dedicated
+    namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEListJobsOperator`
+
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineWorkloadsLink(),)
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        cluster_name: str,
+        namespace: str | None = None,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        if self.namespace:
+            jobs = self.hook.list_jobs_from_namespace(namespace=self.namespace)
+        else:
+            jobs = self.hook.list_jobs_all_namespaces()
+        for job in jobs.items:
+            self.log.info("Retrieved description of Job:\n %s", job)
+        KubernetesEngineWorkloadsLink.persist(context=context, task_instance=self)
+        return None

Review Comment:
   It would be nice to return the list, if the user sets `do_xcom_push` to `True` (default value), they will be pushed to the task result.



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )

Review Comment:
   Since it's a new operator, IMHO there is no need for this exception, especially the message `has become required`.



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -898,3 +899,232 @@ def execute(self, context: Context):
         ).fetch_cluster_info()
 
         return super().execute(context)
+
+
+class GKEDescribeJobOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve information about Job by given name.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEDescribeJobOperator`
+
+    :param job_name: The name of the resource to delete, in this case cluster name.
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "job_name",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineJobLink(),)
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        location: str,
+        namespace: str,
+        cluster_name: str,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.job_name = job_name
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self.job: V1Job | None = None
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )
+
+        hook = GKEJobHook(
+            gcp_conn_id=self.gcp_conn_id,
+            cluster_url=self._cluster_url,
+            ssl_ca_cert=self._ssl_ca_cert,
+        )
+        return hook
+
+    def execute(self, context: Context) -> None:
+        self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+            use_internal_ip=self.use_internal_ip,
+            cluster_hook=self.cluster_hook,
+        ).fetch_cluster_info()
+
+        self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
+        self.log.info(
+            "Retrieved description of Job %s from cluster %s:\n %s",
+            self.job_name,
+            self.cluster_name,
+            self.job,
+        )
+        ti = context["ti"]
+        ti.xcom_push(key="job_name", value=self.job.metadata.name)
+        ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
+        KubernetesEngineJobLink.persist(context=context, task_instance=self)
+        return None
+
+
+class GKEListJobsOperator(GoogleCloudBaseOperator):
+    """
+    Retrieve list of Jobs.
+
+    If namespace parameter is specified, the list of Jobs from dedicated
+    namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GKEListJobsOperator`
+
+    :param project_id: The Google Developers Console project id.
+    :param location: The name of the Google Kubernetes Engine zone or region in which the cluster
+        resides.
+    :param cluster_name: The name of the Google Kubernetes Engine cluster.
+    :param namespace: The name of the Google Kubernetes Engine namespace.
+    :param use_internal_ip: Use the internal IP address as the endpoint.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "gcp_conn_id",
+        "namespace",
+        "cluster_name",
+        "location",
+        "impersonation_chain",
+    )
+    operator_extra_links = (KubernetesEngineWorkloadsLink(),)
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        cluster_name: str,
+        namespace: str | None = None,
+        project_id: str | None = None,
+        use_internal_ip: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.namespace = namespace
+        self.cluster_name = cluster_name
+        self.use_internal_ip = use_internal_ip
+        self.impersonation_chain = impersonation_chain
+
+        self._ssl_ca_cert: str | None = None
+        self._cluster_url: str | None = None
+
+        if self.gcp_conn_id is None:
+            raise AirflowException(
+                "The gcp_conn_id parameter has become required. If you want to use Application Default "
+                "Credentials (ADC) strategy for authorization, create an empty connection "
+                "called `google_cloud_default`.",
+            )
+
+    @cached_property
+    def cluster_hook(self) -> GKEHook:
+        return GKEHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    @cached_property
+    def hook(self) -> GKEJobHook:
+        if self._cluster_url is None or self._ssl_ca_cert is None:
+            raise AttributeError(
+                "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+                "Try to use self.get_kube_creds method",
+            )

Review Comment:
   I think moving the assignment block to this property will ensure that they are defined, in case you add it for user sub-classes:
   ```python
           self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
               cluster_name=self.cluster_name,
               project_id=self.project_id,
               use_internal_ip=self.use_internal_ip,
               cluster_hook=self.cluster_hook,
           ).fetch_cluster_info()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1498175502


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -502,6 +503,30 @@ def create_job(
             raise e
         return resp
 
+    def get_job(self, job_name: str, namespace: str) -> V1Job:

Review Comment:
   I'm wondering, if this is required for the Google provider, why don't we add these methods to the Google providers? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #37598:
URL: https://github.com/apache/airflow/pull/37598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499134149


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -502,6 +503,30 @@ def create_job(
             raise e
         return resp
 
+    def get_job(self, job_name: str, namespace: str) -> V1Job:
+        """Get Job of specified name from Google Cloud.
+
+        :param job_name: Name of Job to fetch.
+        :param namespace: Namespace of the Job.
+        :return: Job object
+        """
+        return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True)

Review Comment:
   According to documentation, this parameter is indeed for outputting everything pretty :)
   https://github.com/kubernetes-client/python/blob/master/kubernetes/client/api/batch_v1_api.py#L2674
   i have set this one to True, since i am using those api calls for output of the information 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add GKEListJobsOperator and GKEDescribeJobOperator [airflow]

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on code in PR #37598:
URL: https://github.com/apache/airflow/pull/37598#discussion_r1499089423


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -502,6 +503,30 @@ def create_job(
             raise e
         return resp
 
+    def get_job(self, job_name: str, namespace: str) -> V1Job:

Review Comment:
   For this purpose i found it very convenient to use kubernetes api, and since we already had a dedicated hook for Job in kubernetes, i have only extended its functionality 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org