You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/14 17:40:01 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library

mik-laj commented on a change in pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#discussion_r425316910



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,186 +15,678 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrat with Google Cloud Build service."""
-import re
-from copy import deepcopy
-from typing import Any, Dict, Iterable, Optional
-from urllib.parse import unquote, urlparse
 
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
-from airflow.utils.decorators import apply_defaults
-
-REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+"""Operators that integrates with Google Cloud Build service."""
 
+from typing import Dict, Optional, Sequence, Tuple, Union
 
-class BuildProcessor:
-    """
-    Processes build configurations to add additional functionality to support the use of operators.
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
+from google.protobuf.json_format import MessageToDict
 
-    The following improvements are made:
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook  # noqa
+from airflow.utils.decorators import apply_defaults
 
-    * It is required to provide the source and only one type can be given,
-    * It is possible to provide the source as the URL address instead dict.
 
-    :param body: The request body.
-        See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
-    :type body: dict
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id_: The ID of the build.
+    :type id_: str
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+
+    :rtype: dict
     """
-    def __init__(self, body: Dict) -> None:
-        self.body = deepcopy(body)
-
-    def _verify_source(self):
-        is_storage = "storageSource" in self.body["source"]
-        is_repo = "repoSource" in self.body["source"]
-
-        sources_count = sum([is_storage, is_repo])
 
-        if sources_count != 1:
-            raise AirflowException(
-                "The source could not be determined. Please choose one data source from: "
-                "storageSource and repoSource."
-            )
+    template_fields = ("project_id", "id_", "gcp_conn_id")
 
-    def _reformat_source(self):
-        self._reformat_repo_source()
-        self._reformat_storage_source()
+    @apply_defaults
+    def __init__(
+        self,
+        id_: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.id_ = id_
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
 
-    def _reformat_repo_source(self):
-        if "repoSource" not in self.body["source"]:
-            return
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id)
+        result = hook.cancel_build(
+            id_=self.id_,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return MessageToDict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
 
-        source = self.body["source"]["repoSource"]
+    :param build: The build resource to create. If a dict is provided, it must be of the same form
+        as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+        If set to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+
+    :rtype: dict
+    """
 
-        if not isinstance(source, str):
-            return
+    template_fields = ("project_id", "build", "gcp_conn_id")
 
-        self.body["source"]["repoSource"] = self._convert_repo_url_to_dict(source)
+    @apply_defaults
+    def __init__(
+        self,
+        build: Union[Dict, Build],

Review comment:
       I was mainly worried about the lack of this logic.
   https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/cloud_build.py#L32-L154
   It will be best to support two parameters and display a message when the old one is used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org