You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/02 16:46:23 UTC

[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r741282891



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")
+
+    def on_kill(self):
+        if self.operation:
+            self.operation.cancel()
+
+
+class DataprocDeleteBatchOperator(BaseOperator):
+    """
+    Deletes the batch workload resource.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting batch: %s", self.batch_id)
+        hook.delete_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Batch deleted.")
+
+
+class DataprocGetBatchOperator(BaseOperator):
+    """
+    Gets the batch workload resource representation.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Getting batch: %s", self.batch_id)
+        batch = hook.get_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Batch.to_dict(batch)
+
+
+class DataprocListBatchesOperator(BaseOperator):
+    """
+    Lists batch workloads.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param page_size: Optional. The maximum number of batches to return in each response. The service may
+        return fewer than this value. The default page size is 20; the maximum page size is 1000.
+    :type page_size: int
+    :param page_token: Optional. A page token received from a previous ``ListBatches`` call.
+        Provide this token to retrieve the subsequent page.
+    :type page_token: str
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_batches(
+            region=self.region,
+            project_id=self.project_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Batch.to_dict(result) for result in results]

Review comment:
       @turbaszek I've fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org