You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/26 19:54:55 UTC

[GitHub] [airflow] MaksYermak opened a new pull request #19248: Create dataproc serverless spark batches operator

MaksYermak opened a new pull request #19248:
URL: https://github.com/apache/airflow/pull/19248


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   Create operator for working with Batches for Google Dataproc. Includes operators, hooks, example dags, tests and docs.
   
   Co-authored-by: Wojciech Januszek januszek@google.com
   Co-authored-by: Lukasz Wyszomirski wyszomirski@google.com
   Co-authored-by: Maksim Yermakou maksimy@google.com
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070762748


   ![image](https://user-images.githubusercontent.com/14033798/158788866-499b2201-8966-4ba0-a1c6-b50810add30f.png)
   I removed the main python file on purpose.
   
   ```python
   def get_week_ago(dt):
       bd = dt + relativedelta(days=-7)
       return bd.strftime('%Y-%m-%d')
   
   dag = DAG(
           dag_id=f"{table_name}_table_v4",
           ...
           user_defined_macros={'week_ago': get_week_ago}
       )
   
   
   create_batch = DataprocCreateBatchOperator(
           task_id="create_batch",
           project_id=project_id,
           region=f"{{{{ var.value.region_{environment} }}}}",
           batch=Batch(pyspark_batch=PySparkBatch(
           main_python_file_uri=main_python_file_uri,
           python_file_uris=files,
           args=[
               "--start-date",
               "{{ week_ago(execution_date) }}", # it doesn't work
               "--end-date",
               "{{ ds }}", # it doesn't work
               "--table",
               table_name,
               "--source-bucket",
               source_bucket,
               "--source-prefix",
               source_prefix,
               "--output-path",
               f"{output_path}/{table_name}/",
           ],
       ), runtime_config=RuntimeConfig(container_image="gcr.io/company/image_name:1.0.1")),
           batch_id=f"pyspark-table-{table_name.replace('_','-')}-{{{{ ds_nodash }}}}",
           retry=DEFAULT,
           timeout=500,
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1063727047


   @aoelvp94 I'm not a expert of Dataproc serverless service, but operator is only a wrapper for the SDK and we take the same parameters as we heve there. So in the `Batch` definision we have `runtimeConfig` where you can specify the image. I hope this will help you.
   
   Refs:
   Batch -> https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#Batch
   runtimeConfig -> https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#RuntimeConfig


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r738479770



##########
File path: airflow/providers/google/cloud/example_dags/example_dataproc.py
##########
@@ -252,3 +263,40 @@
 
     # Task dependency created via `XComArgs`:
     #   spark_task_async >> spark_task_async_sensor
+
+with models.DAG(
+    "example_gcp_batch_dataproc",
+    schedule_interval='@once',
+    start_date=days_ago(1),

Review comment:
       ```suggestion
       "example_gcp_batch_dataproc",
       schedule_interval='@once',
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ```
   
   There is an almost-finished effort to transition away from using `start_date=days_ago(n)` in example DAGs to using a static value as best practice. New example DAGs should follow the static start_date approach. The value used doesn't matter as long as it's static.
   
   Also adding `catchup=False` has been recently discussed for all example DAGs to help with any accidental DAG run explosions if new users mutate the `schedule_interval` without fully understanding `catchup=True` is the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r739460404



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")
+
+    def on_kill(self):
+        if self.operation:
+            self.operation.cancel()
+
+
+class DataprocDeleteBatchOperator(BaseOperator):
+    """
+    Deletes the batch workload resource.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting batch: %s", self.batch_id)
+        hook.delete_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Batch deleted.")
+
+
+class DataprocGetBatchOperator(BaseOperator):
+    """
+    Gets the batch workload resource representation.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Getting batch: %s", self.batch_id)
+        batch = hook.get_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Batch.to_dict(batch)
+
+
+class DataprocListBatchesOperator(BaseOperator):
+    """
+    Lists batch workloads.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param page_size: Optional. The maximum number of batches to return in each response. The service may
+        return fewer than this value. The default page size is 20; the maximum page size is 1000.
+    :type page_size: int
+    :param page_token: Optional. A page token received from a previous ``ListBatches`` call.
+        Provide this token to retrieve the subsequent page.
+    :type page_token: str
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_batches(
+            region=self.region,
+            project_id=self.project_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Batch.to_dict(result) for result in results]

Review comment:
       Is there a chance that this response will be bigger than default allowed XCom size? I've never thought about how operators should handle such cases. @potiuk @mik-laj WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070656799


   `batch` when I use `Batch` / `PySparkBatch` object


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak removed a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak removed a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070964190


   @aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.
   
   `{
               "pyspark_batch": {
                   "main_python_file_uri": main_python_file_uri,
                   "python_file_uris": files,
                   "args": [
                       "--start-date={{ get_week_ago(data_interval_start) }}",
                       "--end-date={{ ds }}",
                       (. . .)
                   ],
               },
              ( . . .)
   }`
   One more thing in the last Jinja version 'execution_date' from the template is deprecated. Please use 'data_interval_start' or 'logical_date' instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #19248:
URL: https://github.com/apache/airflow/pull/19248


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-968741348


   @turbaszek @josh-fell @mik-laj hi guys, could you look on this PR one more time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r737561941



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,311 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """

Review comment:
       Missing `gcp_conn_id` and `impersonation_chain` from the docstring.

##########
File path: airflow/providers/google/cloud/example_dags/example_dataproc.py
##########
@@ -252,3 +263,40 @@
 
     # Task dependency created via `XComArgs`:
     #   spark_task_async >> spark_task_async_sensor
+
+with models.DAG(
+    "example_gcp_batch_dataproc",
+    schedule_interval='@once',
+    start_date=days_ago(1),

Review comment:
       ```suggestion
       "example_gcp_batch_dataproc",
       schedule_interval='@once',
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ```
   
   There is an almost-finished effort to transition away from using start_date=days_ago(n) in example DAGs to using a static datetime(...) value as best practice. New example DAGs should follow the static start_date approach. The value used doesn't matter as long as it's static.
   
   Also adding `catchup=False` has been recently discussed as an addition for all example DAGs to help with any accidental DAG run explosions if new users mutate the `schedule_interval` without fully understanding `catchup=True` is the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-975403342


   @turbaszek @josh-fell @mik-laj Is the a chance to merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070964190






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957744160


   > @MaksYermak General thoughts across the new operators:
   > 
   > * WDYT about adding `batch_id` as a `template_field`? Would it be beneficial for users to be able to dynamically create `batch_id` values?
   > * Adjacently, is `batch_id` something the `DataprocCreateBatchOperator` can push as an `XCom` such that the value can be used in downstream tasks as an `XComArgs` (i.e. the use case in the example DAG if the `.output` property is used as an input to the `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` tasks)?
   > 
   > Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.
   
   @josh-fell answers for your questions
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` return a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-971395954


   > @MaksYermak Should `batch_id` be added to `template_fields` in `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` as well? No strong opinion though.
   > 
   > LGTM 👍
   
   @josh-fell make sense, I have added it to the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r738479770



##########
File path: airflow/providers/google/cloud/example_dags/example_dataproc.py
##########
@@ -252,3 +263,40 @@
 
     # Task dependency created via `XComArgs`:
     #   spark_task_async >> spark_task_async_sensor
+
+with models.DAG(
+    "example_gcp_batch_dataproc",
+    schedule_interval='@once',
+    start_date=days_ago(1),

Review comment:
       ```suggestion
       "example_gcp_batch_dataproc",
       schedule_interval='@once',
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ```
   
   There is an almost-finished effort to transition away from using start_date=days_ago(n) in example DAGs to using a static datetime(...) value as best practice. New example DAGs should follow the static start_date approach. The value used doesn't matter as long as it's static.
   
   Also adding `catchup=False` has been recently discussed for all example DAGs to help with any accidental DAG run explosions if new users mutate the `schedule_interval` without fully understanding `catchup=True` is the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957744160


   > @MaksYermak General thoughts across the new operators:
   > 
   > * WDYT about adding `batch_id` as a `template_field`? Would it be beneficial for users to be able to dynamically create `batch_id` values?
   > * Adjacently, is `batch_id` something the `DataprocCreateBatchOperator` can push as an `XCom` such that the value can be used in downstream tasks as an `XComArgs` (i.e. the use case in the example DAG if the `.output` property is used as an input to the `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` tasks)?
   > 
   > Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.
   
   @josh-fell answers for your questions
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` returns a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r739344873



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,311 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """

Review comment:
       @josh-fell I've added it 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070664143


   can you share Batch config?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1064013644


   Yeah @lwyszomi I am trying with something like that. I am trying that now so if I have news I will leave the code snippet for future references


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-977749514


   > @MaksYermak @lwyszomi There is a static check that is failing. Can you address this?
   > 
   > FYI - I am not able to merge. A code owner will have to do that.
   
   @josh-fell I have fixed it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070662725


   @aoelvp94 butch is tepletized so it should work, but which property inside `Batch/PySparkBatch` you try to templetize


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070964190


   @aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.
   
   `{
               "pyspark_batch": {
                   "main_python_file_uri": main_python_file_uri,
                   "python_file_uris": files,
                   "args": [
                       "--start-date={{ get_week_ago(data_interval_start) }}",
                       "--end-date={{ ds }}",
                       (. . .)
                   ],
               },
              ( . . .)
   }`
   One more thing in the last Jinja version 'execution_date' from the template is deprecated. Please use 'data_interval_start' or 'logical_date' instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak removed a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak removed a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070964190


   @aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.
   
   `{
               "pyspark_batch": {
                   "main_python_file_uri": main_python_file_uri,
                   "python_file_uris": files,
                   "args": [
                       "--start-date={{ get_week_ago(data_interval_start) }}",
                       "--end-date={{ ds }}",
                       (. . .)
                   ],
               },
              ( . . .)
   }`
   One more thing in the last Jinja version 'execution_date' from the template is deprecated. Please use 'data_interval_start' or 'logical_date' instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1066681008


   
   
   
   
   > > @aoelvp94 which version of the provider you are using? The Batch object is templated field starting from 6.4.0
   > 
   > I am using `composer-2.0.5-airflow-2.2.3` so I have 6.4.0
   
   I will need more investigation why jijna paramams doesn't work, I checked and you have right that for Composer 2.0.5 using 6.4.0.
   
   > Another question, why the operator don't generate a hash in `batch_id` field as happens in `DataprocSubmitPySparkJobOperator` (I am replacing this implementation)? I have to delete the batch to run again the same batch (I prefer having the historical runs)
   
   We created operators based on the exisiting SDK, we didn't add any extra logic to add hash to the `batch_id`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-976717416


   @MaksYermak @lwyszomi There is a static check that is failing. Can you address this?
   
   FYI - I am not able to merge. A code owner will have to do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-973834629


   @turbaszek @josh-fell @mik-laj hi guys, could you look and approve this PR for merge if all good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r741282891



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")
+
+    def on_kill(self):
+        if self.operation:
+            self.operation.cancel()
+
+
+class DataprocDeleteBatchOperator(BaseOperator):
+    """
+    Deletes the batch workload resource.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting batch: %s", self.batch_id)
+        hook.delete_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Batch deleted.")
+
+
+class DataprocGetBatchOperator(BaseOperator):
+    """
+    Gets the batch workload resource representation.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Getting batch: %s", self.batch_id)
+        batch = hook.get_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Batch.to_dict(batch)
+
+
+class DataprocListBatchesOperator(BaseOperator):
+    """
+    Lists batch workloads.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param page_size: Optional. The maximum number of batches to return in each response. The service may
+        return fewer than this value. The default page size is 20; the maximum page size is 1000.
+    :type page_size: int
+    :param page_token: Optional. A page token received from a previous ``ListBatches`` call.
+        Provide this token to retrieve the subsequent page.
+    :type page_token: str
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_batches(
+            region=self.region,
+            project_id=self.project_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Batch.to_dict(result) for result in results]

Review comment:
       @turbaszek I've fix it

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")

Review comment:
       @turbaszek I´ve fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marekw-openx commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
marekw-openx commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070958182


   @aoelvp94, have you tried passing `batch` config in the form of python dict? in my case I'm using just a python dict and it gets templetized, like:
   ```
   batch = {
     "spark_batch": {
       "spark_job": {
         "args": ["-d", "{{ ts }}"]
       }
     }
     (...)
   }
   ```
   
   also, please double check the apache-airflow-providers-google version (`batch` wasn't templated in the 6.2.0)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1064779972


   @lwyszomi No success when I tried to set jinja params in the `Batch` object, do you know why?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r739459373



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")

Review comment:
       Inconsistent return statements. If batch exist we should return existing batch 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r739346928



##########
File path: airflow/providers/google/cloud/example_dags/example_dataproc.py
##########
@@ -252,3 +263,40 @@
 
     # Task dependency created via `XComArgs`:
     #   spark_task_async >> spark_task_async_sensor
+
+with models.DAG(
+    "example_gcp_batch_dataproc",
+    schedule_interval='@once',
+    start_date=days_ago(1),

Review comment:
       @josh-fell I've changed this code to the new approach




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957741669


   > * dynamically create `batch_id` values
   
   @josh-fell answers for your questions 
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` return a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak removed a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak removed a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957741669


   > * dynamically create `batch_id` values
   
   @josh-fell answers for your questions 
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` return a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r741282891



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")
+
+    def on_kill(self):
+        if self.operation:
+            self.operation.cancel()
+
+
+class DataprocDeleteBatchOperator(BaseOperator):
+    """
+    Deletes the batch workload resource.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting batch: %s", self.batch_id)
+        hook.delete_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Batch deleted.")
+
+
+class DataprocGetBatchOperator(BaseOperator):
+    """
+    Gets the batch workload resource representation.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Getting batch: %s", self.batch_id)
+        batch = hook.get_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Batch.to_dict(batch)
+
+
+class DataprocListBatchesOperator(BaseOperator):
+    """
+    Lists batch workloads.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param page_size: Optional. The maximum number of batches to return in each response. The service may
+        return fewer than this value. The default page size is 20; the maximum page size is 1000.
+    :type page_size: int
+    :param page_token: Optional. A page token received from a previous ``ListBatches`` call.
+        Provide this token to retrieve the subsequent page.
+    :type page_token: str
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_batches(
+            region=self.region,
+            project_id=self.project_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Batch.to_dict(result) for result in results]

Review comment:
       @turbaszek I've fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957744160


   > @MaksYermak General thoughts across the new operators:
   > 
   > * WDYT about adding `batch_id` as a `template_field`? Would it be beneficial for users to be able to dynamically create `batch_id` values?
   > * Adjacently, is `batch_id` something the `DataprocCreateBatchOperator` can push as an `XCom` such that the value can be used in downstream tasks as an `XComArgs` (i.e. the use case in the example DAG if the `.output` property is used as an input to the `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` tasks)?
   > 
   > Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.
   
   @josh-fell answers for your questions
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` returns a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1064779972


   @lwyszomi No success when I tried to set jinja params in the `Batch` object, did you know why?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1066447762


   @aoelvp94 which version of the provider you are using? The Batch object is templated field starting from 6.4.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070971125


   @aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.
   
   ```
   batch = {
       "pyspark_batch": {
           "main_python_file_uri": main_python_file_uri,
           "python_file_uris": files,
           "args": [
               "--start-date={{ get_week_ago(data_interval_start) }}",
               "--end-date={{ ds }}",
               (. . .)
           ],
       },
       ( . . .)
   }
   ```
   
   One more thing in the last Jinja version `execution_date` from the template is deprecated. Please use `data_interval_start` or `logical_date` instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-953935223


   @MaksYermak General thoughts across the new operators:
   - WDYT about adding `batch_id` as a `template_field`? Could it be beneficial for users to be able to dynamically create `batch_id` values?
   - Adjacently, is `batch_id` something the `DataprocCreateBatchOperator` can push as an `XCom` such that the value can be used in downstream tasks as an `XComArgs` (i.e. the use case in the example DAG if the `.output` property is used as an input to the `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` tasks)? 
   
   Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-953935223


   @MaksYermak General thoughts across the new operators:
   - WDYT about adding `batch_id` as a `template_field`? Would it be beneficial for users to be able to dynamically create `batch_id` values?
   - Adjacently, is `batch_id` something the `DataprocCreateBatchOperator` can push as an `XCom` such that the value can be used in downstream tasks as an `XComArgs` (i.e. the use case in the example DAG if the `.output` property is used as an input to the `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` tasks)? 
   
   Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak removed a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak removed a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957741669


   > * dynamically create `batch_id` values
   
   @josh-fell answers for your questions 
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` return a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957741669






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r741282891



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")
+
+    def on_kill(self):
+        if self.operation:
+            self.operation.cancel()
+
+
+class DataprocDeleteBatchOperator(BaseOperator):
+    """
+    Deletes the batch workload resource.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting batch: %s", self.batch_id)
+        hook.delete_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Batch deleted.")
+
+
+class DataprocGetBatchOperator(BaseOperator):
+    """
+    Gets the batch workload resource representation.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Getting batch: %s", self.batch_id)
+        batch = hook.get_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Batch.to_dict(batch)
+
+
+class DataprocListBatchesOperator(BaseOperator):
+    """
+    Lists batch workloads.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param page_size: Optional. The maximum number of batches to return in each response. The service may
+        return fewer than this value. The default page size is 20; the maximum page size is 1000.
+    :type page_size: int
+    :param page_token: Optional. A page token received from a previous ``ListBatches`` call.
+        Provide this token to retrieve the subsequent page.
+    :type page_token: str
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_batches(
+            region=self.region,
+            project_id=self.project_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Batch.to_dict(result) for result in results]

Review comment:
       @turbaszek I've fix it

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")

Review comment:
       @turbaszek I´ve fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-980320715


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-980431376


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957741669






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-970380590


   @MaksYermak Should `batch_id` be added to `template_fields` in `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` as well? No strong opinion though.
   
   LGTM 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070767209


   @aoelvp94 thanks, we will check why this not work, I will back to you with update when I will have any information


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070643969


   @aoelvp94 for which property you want to use jinja template?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070964190


   @aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.
   
   `{
               "pyspark_batch": {
                   "main_python_file_uri": main_python_file_uri,
                   "python_file_uris": files,
                   "args": [
                       "--start-date={{ get_week_ago(data_interval_start) }}",
                       "--end-date={{ ds }}",
                       (. . .)
                   ],
               },
              ( . . .)
   }`
   One more thing in the last Jinja version 'execution_date' from the template is deprecated. Please use 'data_interval_start' or 'logical_date' instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070964190


   @aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.
   
   `{
               "pyspark_batch": {
                   "main_python_file_uri": main_python_file_uri,
                   "python_file_uris": files,
                   "args": [
                       "--start-date={{ get_week_ago(data_interval_start) }}",
                       "--end-date={{ ds }}",
                       (. . .)
                   ],
               },
              ( . . .)
   }`
   One more thing in the last Jinja version 'execution_date' from the template is deprecated. Please use 'data_interval_start' or 'logical_date' instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r757668085



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")
+
+    def on_kill(self):
+        if self.operation:
+            self.operation.cancel()
+
+
+class DataprocDeleteBatchOperator(BaseOperator):
+    """
+    Deletes the batch workload resource.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting batch: %s", self.batch_id)
+        hook.delete_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Batch deleted.")
+
+
+class DataprocGetBatchOperator(BaseOperator):
+    """
+    Gets the batch workload resource representation.
+
+    :param batch_id: Required. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        batch_id: str,
+        region: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.batch_id = batch_id
+        self.region = region
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Getting batch: %s", self.batch_id)
+        batch = hook.get_batch(
+            batch_id=self.batch_id,
+            region=self.region,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Batch.to_dict(batch)
+
+
+class DataprocListBatchesOperator(BaseOperator):
+    """
+    Lists batch workloads.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param page_size: Optional. The maximum number of batches to return in each response. The service may
+        return fewer than this value. The default page size is 20; the maximum page size is 1000.
+    :type page_size: int
+    :param page_token: Optional. A page token received from a previous ``ListBatches`` call.
+        Provide this token to retrieve the subsequent page.
+    :type page_token: str
+    :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("region", "project_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        results = hook.list_batches(
+            region=self.region,
+            project_id=self.project_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Batch.to_dict(result) for result in results]

Review comment:
       I think this is not a problem - such response is unlikely to get bigger than the limit, and even if so - we are actually much more relaxed now with XCom, as custom xcom backends might handle arbitrary size of data if needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak removed a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak removed a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957741669


   > * dynamically create `batch_id` values
   
   @josh-fell answers for your questions 
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` return a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak commented on a change in pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r741283461



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
         )
         operation.result()
         self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+    """
+    Creates a batch workload.
+
+    :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param batch: Required. The batch to create.
+    :type batch: google.cloud.dataproc_v1.types.Batch
+    :param batch_id: Optional. The ID to use for the batch, which will become the final component
+        of the batch's resource name.
+        This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+    :type batch_id: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
+        the first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'region',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str = None,
+        project_id: str,
+        batch: Union[Dict, Batch],
+        batch_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = "",
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.batch = batch
+        self.batch_id = batch_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.operation: Optional[operation.Operation] = None
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating batch")
+        try:
+            self.operation = hook.create_batch(
+                region=self.region,
+                project_id=self.project_id,
+                batch=self.batch,
+                batch_id=self.batch_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(self.timeout, self.operation)
+            self.log.info("Batch %s created", self.batch_id)
+            return Batch.to_dict(result)
+        except AlreadyExists:
+            self.log.info("Batch with given id already exists")

Review comment:
       @turbaszek I´ve fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MaksYermak edited a comment on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
MaksYermak edited a comment on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-957744160


   > @MaksYermak General thoughts across the new operators:
   > 
   > * WDYT about adding `batch_id` as a `template_field`? Would it be beneficial for users to be able to dynamically create `batch_id` values?
   > * Adjacently, is `batch_id` something the `DataprocCreateBatchOperator` can push as an `XCom` such that the value can be used in downstream tasks as an `XComArgs` (i.e. the use case in the example DAG if the `.output` property is used as an input to the `DataprocGetBatchOperator` and `DataprocDeleteBatchOperator` tasks)?
   > 
   > Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.
   
   @josh-fell answers for your questions
   
   1. I think it can be useful I will add it to a code.
   2. No, because `DataprocCreateBatchOperator` returns a `Batch` object, not a `batch_id`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-952780782


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-980431313


   Just intermittent errors. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #19248:
URL: https://github.com/apache/airflow/pull/19248


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1066661959


   Another question, why the operator don't generate a hash in `batch_id` field as happens in `DataprocSubmitPySparkJobOperator` (I am replacing this implementation)? I have to delete the batch to run again the same batch (I prefer having the historical runs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1066659465


   > @aoelvp94 which version of the provider you are using? The Batch object is templated field starting from 6.4.0
   
   I am using `composer-2.0.5-airflow-2.2.3` so I have 6.4.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aoelvp94 commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
aoelvp94 commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1063566921


   A question here, how can I specify the docker image to run pyspark workloads in custom containers using DataprocCreateBatchOperator?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marekw-openx commented on pull request #19248: Create dataproc serverless spark batches operator

Posted by GitBox <gi...@apache.org>.
marekw-openx commented on pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#issuecomment-1070958182


   @aoelvp94, have you tried passing `batch` config in the form of python dict? in my case I'm using just a python dict and it gets templetized, like:
   ```
   batch = {
     "spark_batch": {
       "spark_job": {
         "args": ["-d", "{{ ts }}"]
       }
     }
     (...)
   }
   ```
   
   also, please double check the apache-airflow-providers-google version (`batch` wasn't templated in the 6.2.0)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org