You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/12 06:35:00 UTC

[GitHub] [airflow] ryanyuan opened a new pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

ryanyuan opened a new pull request #18184:
URL: https://github.com/apache/airflow/pull/18184


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE 
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Migrate Google Cloud Build from Discovery API to Python SDK
   
   closes: #8568
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryanyuan commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
ryanyuan commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706827513



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body

Review comment:
       For now, I have set `build` as a required field and `body` as optional. So the `build` can't be None. Do you think if I should make both of them optional to make the operation more backward-compatible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryanyuan commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
ryanyuan commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r707284566



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body

Review comment:
       Hi @turbaszek, I've updated the code to accept both `build` and `body` as optional parameters. And an AirflowException will be raised if both of them are passed. PTAL. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706775739



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body
+
+    def prepare_template(self) -> None:

Review comment:
       ```suggestion
       def _prepare_template(self) -> None:
   ```
   Or is this method part of public interface?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#issuecomment-918213996


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706775478



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",

Review comment:
       ```suggestion
                   "The body parameter has been deprecated. You should pass body using the build parameter.",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706775303



##########
File path: airflow/providers/google/cloud/hooks/cloud_build.py
##########
@@ -55,95 +52,564 @@ class CloudBuildHook(GoogleBaseHook):
     :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    _conn = None  # type: Optional[Any]
-
     def __init__(
         self,
-        api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
         super().__init__(
-            gcp_conn_id=gcp_conn_id,
-            delegate_to=delegate_to,
-            impersonation_chain=impersonation_chain,
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain
         )
+        self._client: Optional[CloudBuildClient] = None
 
-        self.api_version = api_version
+    def get_conn(self) -> CloudBuildClient:
+        """
+        Retrieves the connection to Google Cloud Build.
 
-    def get_conn(self) -> build:
+        :return: Google Cloud Build client object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.CloudBuildClient`
         """
-        Retrieves the connection to Cloud Build.
+        if not self._client:
+            self._client = CloudBuildClient(credentials=self._get_credentials(), client_info=self.client_info)
+        return self._client
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def cancel_build(
+        self,
+        id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Cancels a build in progress.
+
+        :param id: The ID of the build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
 
-        :return: Google Cloud Build services object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
         """
-        if not self._conn:
-            http_authorized = self._authorize()
-            self._conn = build("cloudbuild", self.api_version, http=http_authorized, cache_discovery=False)
-        return self._conn
+        client = self.get_conn()
+
+        self.log.info("Start cancelling build: %s.", id)
+
+        build = client.cancel_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+        self.log.info("Build has been cancelled: %s.", id)
+
+        return build
 
     @GoogleBaseHook.fallback_to_default_project_id
-    def create_build(self, body: dict, project_id: str) -> dict:
+    def create_build(
+        self,
+        build: Union[Dict, Build],
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
         """
         Starts a build with the specified configuration.
 
-        :param body: The request body.
-            See: https://cloud.google.com/cloud-build/docs/api/reference/rest/v1/projects.builds
-        :type body: dict
+        :param build: The build resource to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+        :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build.")
+
+        operation = client.create_build(
+            request={'project_id': project_id, 'build': build},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        try:
+            id = operation.metadata.build.id
+        except Exception:
+            raise AirflowException("Could not retrieve Build ID from Operation.")
+
+        if not wait:
+            return self.get_build(id=id, project_id=project_id)
+
+        operation.result()
+
+        self.log.info("Build has been created: %s.", id)
+
+        return self.get_build(id=id, project_id=project_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_build_trigger(
+        self,
+        trigger: Union[dict, BuildTrigger],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> BuildTrigger:
+        """
+        Creates a new BuildTrigger.
+
+        :param trigger: The BuildTrigger to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        :type trigger: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
         :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build trigger.")
+
+        trigger = client.create_build_trigger(
+            request={'project_id': project_id, 'trigger': trigger},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been created.")
+
+        return trigger
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_build_trigger(
+        self,
+        trigger_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> None:
+        """
+        Deletes a BuildTrigger by its project ID and trigger ID.
+
+        :param trigger_id: The ID of the BuildTrigger to delete.
+        :type trigger_id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+        """
+        client = self.get_conn()
+
+        self.log.info("Start deleting build trigger: %s.", trigger_id)
+
+        client.delete_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been deleted: %s.", trigger_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_build(
+        self,
+        id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Returns information about a previously requested build.
+
+        :param id: The ID of the build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrieving build: %s.", id)
+
+        build = client.get_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+        self.log.info("Build has been retrieved: %s.", id)
+
+        return build
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_build_trigger(
+        self,
+        trigger_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> BuildTrigger:
+        """
+        Returns information about a BuildTrigger.
+
+        :param trigger_id: The ID of the BuildTrigger to get.
+        :type trigger_id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrieving build trigger: %s.", trigger_id)
+
+        trigger = client.get_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been retrieved: %s.", trigger_id)
+
+        return trigger
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_build_triggers(
+        self,
+        project_id: str,
+        location: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> List[BuildTrigger]:
+        """
+        Lists existing BuildTriggers.
+
+        :param project_id: Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param location: The location of the project.
+        :type location: string
+        :param page_size: Optional, number of results to return in the list.
+        :type page_size: Optional[int]
+        :param page_token: Optional, token to provide to skip to a particular spot in the list.
+        :type page_token: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        parent = f"projects/{project_id}/locations/{location}"
+
+        self.log.info("Start retrieving build triggers.")
+
+        response = client.list_build_triggers(
+            request={
+                'parent': parent,
+                'project_id': project_id,
+                'page_size': page_size,
+                'page_token': page_token,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build triggers have been retrieved.")
+
+        return list(response.triggers)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_builds(
+        self,
+        project_id: str,
+        location: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[int] = None,
+        filter: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> List[Build]:
+        """
+        Lists previously requested builds.
+
+        :param project_id: Google Cloud Project project_id where the function belongs.
             If set to None or missing, the default project_id from the Google Cloud connection is used.
         :type project_id: str
-        :return: Dict
+        :param location: The location of the project.
+        :type location: string
+        :param page_size: Optional, number of results to return in the list.
+        :type page_size: Optional[int]
+        :param page_token: Optional, token to provide to skip to a particular spot in the list.
+        :type page_token: Optional[str]
+        :param filter: Optional, the raw filter text to constrain the results.
+        :type filter: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: List[`google.cloud.devtools.cloudbuild_v1.types.Build`]
         """
-        service = self.get_conn()
+        client = self.get_conn()
+
+        parent = f"projects/{project_id}/locations/{location}"
 
-        # Create build
-        response = (
-            service.projects()
-            .builds()
-            .create(projectId=project_id, body=body)
-            .execute(num_retries=self.num_retries)
+        self.log.info("Start retrieving builds.")
+
+        response = client.list_builds(
+            request={
+                'parent': parent,
+                'project_id': project_id,
+                'page_size': page_size,
+                'page_token': page_token,
+                'filter': filter,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
         )
 
-        # Wait
-        operation_name = response["name"]
-        self._wait_for_operation_to_complete(operation_name=operation_name)
+        self.log.info("Builds have been retrieved.")
+
+        return list(response.builds)
 
-        # Get result
-        build_id = response["metadata"]["build"]["id"]
+    @GoogleBaseHook.fallback_to_default_project_id
+    def retry_build(
+        self,
+        id: str,
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Creates a new build based on the specified build. This method creates a new build
+        using the original build request, which may or may not result in an identical build.
 
-        result = (
-            service.projects()
-            .builds()
-            .get(projectId=project_id, id=build_id)
-            .execute(num_retries=self.num_retries)
+        :param id: Build ID of the original build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrying build: %s.", id)
+
+        operation = client.retry_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
         )
 
-        return result
-
-    def _wait_for_operation_to_complete(self, operation_name: str) -> None:
-        """
-        Waits for the named operation to complete - checks status of the
-        asynchronous call.
-
-        :param operation_name: The name of the operation.
-        :type operation_name: str
-        :return: The response returned by the operation.
-        :rtype: dict
-        :exception: AirflowException in case error is returned.
-        """
-        service = self.get_conn()
-        while True:
-            operation_response = (
-                service.operations().get(name=operation_name).execute(num_retries=self.num_retries)
-            )
-            if operation_response.get("done"):
-                response = operation_response.get("response")
-                error = operation_response.get("error")
-                # Note, according to documentation always either response or error is
-                # set when "done" == True
-                if error:
-                    raise AirflowException(str(error))
-                return response
-            time.sleep(TIME_TO_SLEEP_IN_SECONDS)
+        try:
+            id = operation.metadata.build.id
+        except Exception:
+            raise AirflowException("Could not retrieve Build ID from Operation.")
+
+        if not wait:
+            return self.get_build(id=id, project_id=project_id)
+
+        operation.result()
+
+        self.log.info("Build has been retried: %s.", id)
+
+        return self.get_build(id=id, project_id=project_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def run_build_trigger(
+        self,
+        trigger_id: str,
+        source: Union[dict, RepoSource],
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Runs a BuildTrigger at a particular source revision.
+
+        :param trigger_id: The ID of the trigger.
+        :type trigger_id: str
+        :param source: Source to build against this trigger. If a dict is provided, it must be of the
+            same form as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.RepoSource`
+        :type source: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.RepoSource`]
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start running build trigger: %s.", trigger_id)
+
+        operation = client.run_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id, 'source': source},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        print(operation)

Review comment:
       ```suggestion
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryanyuan commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
ryanyuan commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706827237



##########
File path: UPDATING.md
##########
@@ -1777,6 +1777,16 @@ previous one was (project_id, dataset_id, ...) (breaking change)
 - `get_tabledata` returns list of rows instead of API response in dict format. This method is deprecated in
  favor of `list_rows`. (breaking change)
 
+#### `airflow.providers.google.cloud.hooks.cloud_build.CloudBuildHook`

Review comment:
       I'm not quite sure what which section I should update. Is it going to be 5.2.0?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mnojek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
mnojek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r710893826



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,980 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource

Review comment:
       Installing this package helped. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706834594



##########
File path: UPDATING.md
##########
@@ -1777,6 +1777,16 @@ previous one was (project_id, dataset_id, ...) (breaking change)
 - `get_tabledata` returns list of rows instead of API response in dict format. This method is deprecated in
  favor of `list_rows`. (breaking change)
 
+#### `airflow.providers.google.cloud.hooks.cloud_build.CloudBuildHook`

Review comment:
       I think @potiuk creates the change logs automatically so no need handle it.
   This PR forces bumping major release as it has a breaking change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706775682



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body

Review comment:
       Should we add validation that only one of `build` and `body` can be passed? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706776268



##########
File path: UPDATING.md
##########
@@ -1777,6 +1777,16 @@ previous one was (project_id, dataset_id, ...) (breaking change)
 - `get_tabledata` returns list of rows instead of API response in dict format. This method is deprecated in
  favor of `list_rows`. (breaking change)
 
+#### `airflow.providers.google.cloud.hooks.cloud_build.CloudBuildHook`

Review comment:
       We should also update `providers/google/CHANGELOG.rst`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryanyuan commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
ryanyuan commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706827094



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body
+
+    def prepare_template(self) -> None:

Review comment:
       This one is to override the prepare_template in the baseoperator.py. I agree it's more like an internal function. We can refactor it in another PR instead of this PR. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706774409



##########
File path: UPDATING.md
##########
@@ -1777,6 +1777,16 @@ previous one was (project_id, dataset_id, ...) (breaking change)
 - `get_tabledata` returns list of rows instead of API response in dict format. This method is deprecated in
  favor of `list_rows`. (breaking change)
 
+#### `airflow.providers.google.cloud.hooks.cloud_build.CloudBuildHook`
+
+#### `airflow.providers.google.cloud.operators.cloud_build.CloudBuildCreateBuildOperator`
+
+The `api_version` has been removed and will not be used since we migrate CloudBuildHook from using

Review comment:
       ```suggestion
   The `api_version` has been removed and will not be used since we migrate `CloudBuildHook` from using
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706774942



##########
File path: airflow/providers/google/cloud/hooks/cloud_build.py
##########
@@ -55,95 +52,564 @@ class CloudBuildHook(GoogleBaseHook):
     :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    _conn = None  # type: Optional[Any]
-
     def __init__(
         self,
-        api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
         super().__init__(
-            gcp_conn_id=gcp_conn_id,
-            delegate_to=delegate_to,
-            impersonation_chain=impersonation_chain,
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain
         )
+        self._client: Optional[CloudBuildClient] = None
 
-        self.api_version = api_version
+    def get_conn(self) -> CloudBuildClient:
+        """
+        Retrieves the connection to Google Cloud Build.
 
-    def get_conn(self) -> build:
+        :return: Google Cloud Build client object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.CloudBuildClient`
         """
-        Retrieves the connection to Cloud Build.
+        if not self._client:
+            self._client = CloudBuildClient(credentials=self._get_credentials(), client_info=self.client_info)
+        return self._client
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def cancel_build(
+        self,
+        id: str,

Review comment:
       `id` is Python [built-in function](https://docs.python.org/3/library/functions.html#id). Should we using something like `id_`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #18184:
URL: https://github.com/apache/airflow/pull/18184


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryanyuan commented on pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
ryanyuan commented on pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#issuecomment-918655847


   Thanks @turbaszek @eladkal @potiuk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706776117



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body
+
+    def prepare_template(self) -> None:
+        # if no file is specified, skip
+        if not isinstance(self.build_raw, str):
+            return
+        with open(self.build_raw) as file:
+            if any(self.build_raw.endswith(ext) for ext in ['.yaml', '.yml']):
+                self.build = yaml.load(file.read(), Loader=yaml.FullLoader)
+            if self.build_raw.endswith('.json'):
+                self.build = json.loads(file.read())
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+
+        build = BuildProcessor(build=self.build).process_body()
+
+        result = hook.create_build(
+            build=build,
+            project_id=self.project_id,
+            wait=self.wait,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildTriggerOperator(BaseOperator):
+    """
+    Creates a new BuildTrigger.
+
+    :param trigger: The BuildTrigger to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+    :type trigger: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger: Union[dict, BuildTrigger],
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger = trigger
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.create_build_trigger(
+            trigger=self.trigger,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return BuildTrigger.to_dict(result)
+
+
+class CloudBuildDeleteBuildTriggerOperator(BaseOperator):
+    """
+    Deletes a BuildTrigger by its project ID and trigger ID.
+
+    :param trigger_id: The ID of the BuildTrigger to delete.
+    :type trigger_id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("project_id", "trigger_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        hook.delete_build_trigger(
+            trigger_id=self.trigger_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+
+class CloudBuildGetBuildOperator(BaseOperator):
+    """
+    Returns information about a previously requested build.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.get_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildGetBuildTriggerOperator(BaseOperator):
+    """
+    Returns information about a BuildTrigger.
+
+    :param trigger_id: The ID of the BuildTrigger to get.
+    :type trigger_id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.get_build_trigger(
+            trigger_id=self.trigger_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return BuildTrigger.to_dict(result)
+
+
+class CloudBuildListBuildTriggersOperator(BaseOperator):
+    """
+    Lists existing BuildTriggers.
+
+    :param location: The location of the project.
+    :type location: string
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param page_size: Optional, number of results to return in the list.
+    :type page_size: Optional[int]
+    :param page_token: Optional, token to provide to skip to a particular spot in the list.
+    :type page_token: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("location", "project_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_build_triggers(
+            project_id=self.project_id,
+            location=self.location,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [BuildTrigger.to_dict(result) for result in results]
+
+
+class CloudBuildListBuildsOperator(BaseOperator):
+    """
+    Lists previously requested builds.
+
+    :param location: The location of the project.
+    :type location: string
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: str
+    :param page_size: Optional, number of results to return in the list.
+    :type page_size: Optional[int]
+    :param filter: Optional, the raw filter text to constrain the results.
+    :type filter: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("location", "project_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        filter: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.page_size = page_size
+        self.filter = filter
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_builds(
+            project_id=self.project_id,
+            location=self.location,
+            page_size=self.page_size,
+            filter=self.filter,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Build.to_dict(result) for result in results]
+
+
+class CloudBuildRetryBuildOperator(BaseOperator):
+    """
+    Creates a new build based on the specified build. This method creates a new build
+    using the original build request, which may or may not result in an identical build.
+
+    :param id: Build ID of the original build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: str
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.retry_build(
+            id=self.id,
+            project_id=self.project_id,
+            wait=self.wait,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildRunBuildTriggerOperator(BaseOperator):
+    """
+    Runs a BuildTrigger at a particular source revision.
+
+    :param trigger_id: The ID of the trigger.
+    :type trigger_id: str
+    :param source: Source to build against this trigger. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.RepoSource`
+    :type source: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.RepoSource`]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: str
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger_id", "source", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        source: Union[dict, RepoSource],
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.source = source
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.run_build_trigger(
+            trigger_id=self.trigger_id,
+            source=self.source,
+            project_id=self.project_id,
+            wait=self.wait,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildUpdateBuildTriggerOperator(BaseOperator):
+    """
+    Updates a BuildTrigger by its project ID and trigger ID.
+
+    :param trigger_id: The ID of the trigger.
+    :type trigger_id: str
+    :param trigger: The BuildTrigger to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+    :type trigger: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger_id", "trigger", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        trigger: Union[dict, BuildTrigger],
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.trigger = trigger
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.update_build_trigger(
+            trigger_id=self.trigger_id,
+            trigger=self.trigger,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return BuildTrigger.to_dict(result)
 
 
 class BuildProcessor:
     """
     Processes build configurations to add additional functionality to support the use of operators.
-
     The following improvements are made:
-
     * It is required to provide the source and only one type can be given,
     * It is possible to provide the source as the URL address instead dict.
 
-    :param body: The request body.
-        See: https://cloud.google.com/cloud-build/docs/api/reference/rest/v1/projects.builds
-    :type body: dict
+    :param build: The request body of the build.
+        See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
+    :type build: Union[Dict, Build]
     """
 
-    def __init__(self, body: dict) -> None:
-        self.body = deepcopy(body)
+    def __init__(self, build: Union[Dict, Build]) -> None:
+        if isinstance(build, Build):
+            self.build = Build(build)
+        self.build = deepcopy(build)
 
     def _verify_source(self) -> None:
-        is_storage = "storageSource" in self.body["source"]
-        is_repo = "repoSource" in self.body["source"]
-
-        sources_count = sum([is_storage, is_repo])
-
-        if sources_count != 1:
+        if not (("storage_source" in self.build["source"]) ^ ("repo_source" in self.build["source"])):
             raise AirflowException(
                 "The source could not be determined. Please choose one data source from: "
-                "storageSource and repoSource."
+                "storage_source and repo_source."
             )
 
     def _reformat_source(self) -> None:
         self._reformat_repo_source()
         self._reformat_storage_source()
 
     def _reformat_repo_source(self) -> None:
-        if "repoSource" not in self.body["source"]:
+        if "repo_source" not in self.build["source"]:
             return
 
-        source = self.body["source"]["repoSource"]
+        source = self.build["source"]["repo_source"]

Review comment:
       ```suggestion
           repo_source = self.build["source"]["repo_source"]
   ```
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706774453



##########
File path: UPDATING.md
##########
@@ -1777,6 +1777,16 @@ previous one was (project_id, dataset_id, ...) (breaking change)
 - `get_tabledata` returns list of rows instead of API response in dict format. This method is deprecated in
  favor of `list_rows`. (breaking change)
 
+#### `airflow.providers.google.cloud.hooks.cloud_build.CloudBuildHook`
+
+#### `airflow.providers.google.cloud.operators.cloud_build.CloudBuildCreateBuildOperator`
+
+The `api_version` has been removed and will not be used since we migrate CloudBuildHook from using
+ Discovery API to native google-cloud-build python library.
+
+The `body` in CloudBuildCreateBuildOperator parameter has been deprecated.

Review comment:
       ```suggestion
   The `body` parameter  in `CloudBuildCreateBuildOperator` has been deprecated.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706774724



##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_build.py
##########
@@ -74,42 +108,148 @@
 }
 # [END howto_operator_create_build_from_repo_body]
 
+
 with models.DAG(
     "example_gcp_cloud_build",
     default_args=dict(start_date=dates.days_ago(1)),
     schedule_interval='@once',
-    tags=['example'],
-) as dag:
+    tags=["example"],
+) as build_dag:
+
     # [START howto_operator_create_build_from_storage]
     create_build_from_storage = CloudBuildCreateBuildOperator(
-        task_id="create_build_from_storage", project_id=GCP_PROJECT_ID, body=create_build_from_storage_body
+        task_id="create_build_from_storage", project_id=GCP_PROJECT_ID, build=create_build_from_storage_body
     )
     # [END howto_operator_create_build_from_storage]
 
     # [START howto_operator_create_build_from_storage_result]
     create_build_from_storage_result = BashOperator(
-        bash_command="echo '{{ task_instance.xcom_pull('create_build_from_storage')['images'][0] }}'",
+        bash_command="echo '{{ task_instance.xcom_pull('create_build_from_storage')['results'] }}'",
         task_id="create_build_from_storage_result",
     )
     # [END howto_operator_create_build_from_storage_result]
 
+    # [START howto_operator_create_build_from_repo]
     create_build_from_repo = CloudBuildCreateBuildOperator(
-        task_id="create_build_from_repo", project_id=GCP_PROJECT_ID, body=create_build_from_repo_body
+        task_id="create_build_from_repo", project_id=GCP_PROJECT_ID, build=create_build_from_repo_body
     )
+    # [END howto_operator_create_build_from_repo]
 
+    # [START howto_operator_create_build_from_repo_result]
     create_build_from_repo_result = BashOperator(
-        bash_command="echo '{{ task_instance.xcom_pull('create_build_from_repo')['images'][0] }}'",
+        bash_command="echo '{{ task_instance.xcom_pull('create_build_from_repo')['results'] }}'",
         task_id="create_build_from_repo_result",
     )
+    # [END howto_operator_create_build_from_repo_result]
+
+    # [START howto_operator_list_builds]
+    list_builds = CloudBuildListBuildsOperator(
+        task_id="list_builds", project_id=GCP_PROJECT_ID, location="global"
+    )
+    # [END howto_operator_list_builds]
+
+    # [START howto_operator_create_build_without_wait]
+    create_build_without_wait = CloudBuildCreateBuildOperator(
+        task_id="create_build_without_wait",
+        project_id=GCP_PROJECT_ID,
+        build=create_build_from_repo_body,
+        wait=False,
+    )
+    # [END howto_operator_create_build_without_wait]
+
+    # [START howto_operator_cancel_build]
+    cancel_build = CloudBuildCancelBuildOperator(
+        task_id="cancel_build",
+        id="{{ task_instance.xcom_pull('create_build_without_wait')['id'] }}",

Review comment:
       ```suggestion
           id= create_build_without_wait.output['id'],
   ```
   Let's stick to the TaskFlow API here as we do in other examples.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706853992



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body
+
+    def prepare_template(self) -> None:

Review comment:
       Sounds good to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706853939



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body

Review comment:
       Yes, I think for backward-compatibility sake we should keep both arguments as option and to proper validation 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706775285



##########
File path: airflow/providers/google/cloud/hooks/cloud_build.py
##########
@@ -55,95 +52,564 @@ class CloudBuildHook(GoogleBaseHook):
     :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    _conn = None  # type: Optional[Any]
-
     def __init__(
         self,
-        api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
         super().__init__(
-            gcp_conn_id=gcp_conn_id,
-            delegate_to=delegate_to,
-            impersonation_chain=impersonation_chain,
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain
         )
+        self._client: Optional[CloudBuildClient] = None
 
-        self.api_version = api_version
+    def get_conn(self) -> CloudBuildClient:
+        """
+        Retrieves the connection to Google Cloud Build.
 
-    def get_conn(self) -> build:
+        :return: Google Cloud Build client object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.CloudBuildClient`
         """
-        Retrieves the connection to Cloud Build.
+        if not self._client:
+            self._client = CloudBuildClient(credentials=self._get_credentials(), client_info=self.client_info)
+        return self._client
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def cancel_build(
+        self,
+        id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Cancels a build in progress.
+
+        :param id: The ID of the build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
 
-        :return: Google Cloud Build services object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
         """
-        if not self._conn:
-            http_authorized = self._authorize()
-            self._conn = build("cloudbuild", self.api_version, http=http_authorized, cache_discovery=False)
-        return self._conn
+        client = self.get_conn()
+
+        self.log.info("Start cancelling build: %s.", id)
+
+        build = client.cancel_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+        self.log.info("Build has been cancelled: %s.", id)
+
+        return build
 
     @GoogleBaseHook.fallback_to_default_project_id
-    def create_build(self, body: dict, project_id: str) -> dict:
+    def create_build(
+        self,
+        build: Union[Dict, Build],
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
         """
         Starts a build with the specified configuration.
 
-        :param body: The request body.
-            See: https://cloud.google.com/cloud-build/docs/api/reference/rest/v1/projects.builds
-        :type body: dict
+        :param build: The build resource to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+        :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build.")
+
+        operation = client.create_build(
+            request={'project_id': project_id, 'build': build},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        try:
+            id = operation.metadata.build.id
+        except Exception:
+            raise AirflowException("Could not retrieve Build ID from Operation.")
+
+        if not wait:
+            return self.get_build(id=id, project_id=project_id)
+
+        operation.result()
+
+        self.log.info("Build has been created: %s.", id)
+
+        return self.get_build(id=id, project_id=project_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_build_trigger(
+        self,
+        trigger: Union[dict, BuildTrigger],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> BuildTrigger:
+        """
+        Creates a new BuildTrigger.
+
+        :param trigger: The BuildTrigger to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        :type trigger: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
         :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build trigger.")
+
+        trigger = client.create_build_trigger(
+            request={'project_id': project_id, 'trigger': trigger},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been created.")
+
+        return trigger
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_build_trigger(
+        self,
+        trigger_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> None:
+        """
+        Deletes a BuildTrigger by its project ID and trigger ID.
+
+        :param trigger_id: The ID of the BuildTrigger to delete.
+        :type trigger_id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+        """
+        client = self.get_conn()
+
+        self.log.info("Start deleting build trigger: %s.", trigger_id)
+
+        client.delete_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been deleted: %s.", trigger_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_build(
+        self,
+        id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Returns information about a previously requested build.
+
+        :param id: The ID of the build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrieving build: %s.", id)
+
+        build = client.get_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+        self.log.info("Build has been retrieved: %s.", id)
+
+        return build
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_build_trigger(
+        self,
+        trigger_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> BuildTrigger:
+        """
+        Returns information about a BuildTrigger.
+
+        :param trigger_id: The ID of the BuildTrigger to get.
+        :type trigger_id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrieving build trigger: %s.", trigger_id)
+
+        trigger = client.get_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been retrieved: %s.", trigger_id)
+
+        return trigger
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_build_triggers(
+        self,
+        project_id: str,
+        location: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> List[BuildTrigger]:
+        """
+        Lists existing BuildTriggers.
+
+        :param project_id: Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param location: The location of the project.
+        :type location: string
+        :param page_size: Optional, number of results to return in the list.
+        :type page_size: Optional[int]
+        :param page_token: Optional, token to provide to skip to a particular spot in the list.
+        :type page_token: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        parent = f"projects/{project_id}/locations/{location}"
+
+        self.log.info("Start retrieving build triggers.")
+
+        response = client.list_build_triggers(
+            request={
+                'parent': parent,
+                'project_id': project_id,
+                'page_size': page_size,
+                'page_token': page_token,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build triggers have been retrieved.")
+
+        return list(response.triggers)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_builds(
+        self,
+        project_id: str,
+        location: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[int] = None,
+        filter: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> List[Build]:
+        """
+        Lists previously requested builds.
+
+        :param project_id: Google Cloud Project project_id where the function belongs.
             If set to None or missing, the default project_id from the Google Cloud connection is used.
         :type project_id: str
-        :return: Dict
+        :param location: The location of the project.
+        :type location: string
+        :param page_size: Optional, number of results to return in the list.
+        :type page_size: Optional[int]
+        :param page_token: Optional, token to provide to skip to a particular spot in the list.
+        :type page_token: Optional[str]
+        :param filter: Optional, the raw filter text to constrain the results.
+        :type filter: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: List[`google.cloud.devtools.cloudbuild_v1.types.Build`]
         """
-        service = self.get_conn()
+        client = self.get_conn()
+
+        parent = f"projects/{project_id}/locations/{location}"
 
-        # Create build
-        response = (
-            service.projects()
-            .builds()
-            .create(projectId=project_id, body=body)
-            .execute(num_retries=self.num_retries)
+        self.log.info("Start retrieving builds.")
+
+        response = client.list_builds(
+            request={
+                'parent': parent,
+                'project_id': project_id,
+                'page_size': page_size,
+                'page_token': page_token,
+                'filter': filter,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
         )
 
-        # Wait
-        operation_name = response["name"]
-        self._wait_for_operation_to_complete(operation_name=operation_name)
+        self.log.info("Builds have been retrieved.")
+
+        return list(response.builds)
 
-        # Get result
-        build_id = response["metadata"]["build"]["id"]
+    @GoogleBaseHook.fallback_to_default_project_id
+    def retry_build(
+        self,
+        id: str,
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Creates a new build based on the specified build. This method creates a new build
+        using the original build request, which may or may not result in an identical build.
 
-        result = (
-            service.projects()
-            .builds()
-            .get(projectId=project_id, id=build_id)
-            .execute(num_retries=self.num_retries)
+        :param id: Build ID of the original build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrying build: %s.", id)
+
+        operation = client.retry_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
         )
 
-        return result
-
-    def _wait_for_operation_to_complete(self, operation_name: str) -> None:
-        """
-        Waits for the named operation to complete - checks status of the
-        asynchronous call.
-
-        :param operation_name: The name of the operation.
-        :type operation_name: str
-        :return: The response returned by the operation.
-        :rtype: dict
-        :exception: AirflowException in case error is returned.
-        """
-        service = self.get_conn()
-        while True:
-            operation_response = (
-                service.operations().get(name=operation_name).execute(num_retries=self.num_retries)
-            )
-            if operation_response.get("done"):
-                response = operation_response.get("response")
-                error = operation_response.get("error")
-                # Note, according to documentation always either response or error is
-                # set when "done" == True
-                if error:
-                    raise AirflowException(str(error))
-                return response
-            time.sleep(TIME_TO_SLEEP_IN_SECONDS)
+        try:
+            id = operation.metadata.build.id
+        except Exception:
+            raise AirflowException("Could not retrieve Build ID from Operation.")

Review comment:
       We are doing it at least in two operators. Do you think it would worth to introduce `get_id_from_operation` method in hook to keep this behaviour consistent? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706775285



##########
File path: airflow/providers/google/cloud/hooks/cloud_build.py
##########
@@ -55,95 +52,564 @@ class CloudBuildHook(GoogleBaseHook):
     :type impersonation_chain: Union[str, Sequence[str]]
     """
 
-    _conn = None  # type: Optional[Any]
-
     def __init__(
         self,
-        api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
     ) -> None:
         super().__init__(
-            gcp_conn_id=gcp_conn_id,
-            delegate_to=delegate_to,
-            impersonation_chain=impersonation_chain,
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain
         )
+        self._client: Optional[CloudBuildClient] = None
 
-        self.api_version = api_version
+    def get_conn(self) -> CloudBuildClient:
+        """
+        Retrieves the connection to Google Cloud Build.
 
-    def get_conn(self) -> build:
+        :return: Google Cloud Build client object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.CloudBuildClient`
         """
-        Retrieves the connection to Cloud Build.
+        if not self._client:
+            self._client = CloudBuildClient(credentials=self._get_credentials(), client_info=self.client_info)
+        return self._client
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def cancel_build(
+        self,
+        id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Cancels a build in progress.
+
+        :param id: The ID of the build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
 
-        :return: Google Cloud Build services object.
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
         """
-        if not self._conn:
-            http_authorized = self._authorize()
-            self._conn = build("cloudbuild", self.api_version, http=http_authorized, cache_discovery=False)
-        return self._conn
+        client = self.get_conn()
+
+        self.log.info("Start cancelling build: %s.", id)
+
+        build = client.cancel_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+        self.log.info("Build has been cancelled: %s.", id)
+
+        return build
 
     @GoogleBaseHook.fallback_to_default_project_id
-    def create_build(self, body: dict, project_id: str) -> dict:
+    def create_build(
+        self,
+        build: Union[Dict, Build],
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
         """
         Starts a build with the specified configuration.
 
-        :param body: The request body.
-            See: https://cloud.google.com/cloud-build/docs/api/reference/rest/v1/projects.builds
-        :type body: dict
+        :param build: The build resource to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+        :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build.")
+
+        operation = client.create_build(
+            request={'project_id': project_id, 'build': build},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        try:
+            id = operation.metadata.build.id
+        except Exception:
+            raise AirflowException("Could not retrieve Build ID from Operation.")
+
+        if not wait:
+            return self.get_build(id=id, project_id=project_id)
+
+        operation.result()
+
+        self.log.info("Build has been created: %s.", id)
+
+        return self.get_build(id=id, project_id=project_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_build_trigger(
+        self,
+        trigger: Union[dict, BuildTrigger],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> BuildTrigger:
+        """
+        Creates a new BuildTrigger.
+
+        :param trigger: The BuildTrigger to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        :type trigger: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
         :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build trigger.")
+
+        trigger = client.create_build_trigger(
+            request={'project_id': project_id, 'trigger': trigger},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been created.")
+
+        return trigger
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_build_trigger(
+        self,
+        trigger_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> None:
+        """
+        Deletes a BuildTrigger by its project ID and trigger ID.
+
+        :param trigger_id: The ID of the BuildTrigger to delete.
+        :type trigger_id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+        """
+        client = self.get_conn()
+
+        self.log.info("Start deleting build trigger: %s.", trigger_id)
+
+        client.delete_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been deleted: %s.", trigger_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_build(
+        self,
+        id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Returns information about a previously requested build.
+
+        :param id: The ID of the build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrieving build: %s.", id)
+
+        build = client.get_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+        self.log.info("Build has been retrieved: %s.", id)
+
+        return build
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_build_trigger(
+        self,
+        trigger_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> BuildTrigger:
+        """
+        Returns information about a BuildTrigger.
+
+        :param trigger_id: The ID of the BuildTrigger to get.
+        :type trigger_id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrieving build trigger: %s.", trigger_id)
+
+        trigger = client.get_build_trigger(
+            request={'project_id': project_id, 'trigger_id': trigger_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build trigger has been retrieved: %s.", trigger_id)
+
+        return trigger
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_build_triggers(
+        self,
+        project_id: str,
+        location: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> List[BuildTrigger]:
+        """
+        Lists existing BuildTriggers.
+
+        :param project_id: Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param location: The location of the project.
+        :type location: string
+        :param page_size: Optional, number of results to return in the list.
+        :type page_size: Optional[int]
+        :param page_token: Optional, token to provide to skip to a particular spot in the list.
+        :type page_token: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+        """
+        client = self.get_conn()
+
+        parent = f"projects/{project_id}/locations/{location}"
+
+        self.log.info("Start retrieving build triggers.")
+
+        response = client.list_build_triggers(
+            request={
+                'parent': parent,
+                'project_id': project_id,
+                'page_size': page_size,
+                'page_token': page_token,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+        self.log.info("Build triggers have been retrieved.")
+
+        return list(response.triggers)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_builds(
+        self,
+        project_id: str,
+        location: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[int] = None,
+        filter: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> List[Build]:
+        """
+        Lists previously requested builds.
+
+        :param project_id: Google Cloud Project project_id where the function belongs.
             If set to None or missing, the default project_id from the Google Cloud connection is used.
         :type project_id: str
-        :return: Dict
+        :param location: The location of the project.
+        :type location: string
+        :param page_size: Optional, number of results to return in the list.
+        :type page_size: Optional[int]
+        :param page_token: Optional, token to provide to skip to a particular spot in the list.
+        :type page_token: Optional[str]
+        :param filter: Optional, the raw filter text to constrain the results.
+        :type filter: Optional[str]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: List[`google.cloud.devtools.cloudbuild_v1.types.Build`]
         """
-        service = self.get_conn()
+        client = self.get_conn()
+
+        parent = f"projects/{project_id}/locations/{location}"
 
-        # Create build
-        response = (
-            service.projects()
-            .builds()
-            .create(projectId=project_id, body=body)
-            .execute(num_retries=self.num_retries)
+        self.log.info("Start retrieving builds.")
+
+        response = client.list_builds(
+            request={
+                'parent': parent,
+                'project_id': project_id,
+                'page_size': page_size,
+                'page_token': page_token,
+                'filter': filter,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
         )
 
-        # Wait
-        operation_name = response["name"]
-        self._wait_for_operation_to_complete(operation_name=operation_name)
+        self.log.info("Builds have been retrieved.")
+
+        return list(response.builds)
 
-        # Get result
-        build_id = response["metadata"]["build"]["id"]
+    @GoogleBaseHook.fallback_to_default_project_id
+    def retry_build(
+        self,
+        id: str,
+        project_id: str,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Build:
+        """
+        Creates a new build based on the specified build. This method creates a new build
+        using the original build request, which may or may not result in an identical build.
 
-        result = (
-            service.projects()
-            .builds()
-            .get(projectId=project_id, id=build_id)
-            .execute(num_retries=self.num_retries)
+        :param id: Build ID of the original build.
+        :type id: str
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param wait: Optional, wait for operation to finish.
+        :type wait: Optional[bool]
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :type retry: Optional[Retry]
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :type timeout: Optional[float]
+        :param metadata: Optional, additional metadata that is provided to the method.
+        :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+        :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+        """
+        client = self.get_conn()
+
+        self.log.info("Start retrying build: %s.", id)
+
+        operation = client.retry_build(
+            request={'project_id': project_id, 'id': id}, retry=retry, timeout=timeout, metadata=metadata
         )
 
-        return result
-
-    def _wait_for_operation_to_complete(self, operation_name: str) -> None:
-        """
-        Waits for the named operation to complete - checks status of the
-        asynchronous call.
-
-        :param operation_name: The name of the operation.
-        :type operation_name: str
-        :return: The response returned by the operation.
-        :rtype: dict
-        :exception: AirflowException in case error is returned.
-        """
-        service = self.get_conn()
-        while True:
-            operation_response = (
-                service.operations().get(name=operation_name).execute(num_retries=self.num_retries)
-            )
-            if operation_response.get("done"):
-                response = operation_response.get("response")
-                error = operation_response.get("error")
-                # Note, according to documentation always either response or error is
-                # set when "done" == True
-                if error:
-                    raise AirflowException(str(error))
-                return response
-            time.sleep(TIME_TO_SLEEP_IN_SECONDS)
+        try:
+            id = operation.metadata.build.id
+        except Exception:
+            raise AirflowException("Could not retrieve Build ID from Operation.")

Review comment:
       We are doing it at least in two method. Do you think it would worth to introduce `get_id_from_operation` method to keep this behaviour consistent? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mnojek commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
mnojek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r709972844



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,980 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource

Review comment:
       I am working now locally on fixing system tests and this change broke a system test `CloudSqlProxySystemTest::test_run_example_dag_cloudsql_query` from file `test_cloud_sql_system.py` for me.
   I was able to run it with 'Passed' status before  but now the error I am getting here is:
   ```
   ERROR    airflow.models.dagbag.DagBag:dagbag.py:329 Failed to import: /opt/airflow/airflow/providers/google/cloud/example_dags/example_cloud_build.py
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagbag.py", line 326, in _load_modules_from_file
       loader.exec_module(new_module)
     File "<frozen importlib._bootstrap_external>", line 678, in exec_module
     File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
     File "/opt/airflow/airflow/providers/google/cloud/example_dags/example_cloud_build.py", line 38, in <module>
       from airflow.providers.google.cloud.operators.cloud_build import (
     File "/opt/airflow/airflow/providers/google/cloud/operators/cloud_build.py", line 30, in <module>
       from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
   ModuleNotFoundError: No module named 'google.cloud.devtools'
   ```
   I also cannot find this package publicly available in pypi.
   Is this some internal package not available for others? Am I doing something wrong?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryanyuan commented on a change in pull request #18184: Migrate Google Cloud Build from Discovery API to Python SDK

Posted by GitBox <gi...@apache.org>.
ryanyuan commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r709996625



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,980 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource

Review comment:
       Hey @mnojek , google.cloud.devtools.cloudbuild_v1 is from here:
   https://github.com/apache/airflow/blob/main/setup.py#L296
   
   Could you try rebuilding your docker image or do `pip install google-cloud-build`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org