You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/02 21:26:19 UTC

[GitHub] [airflow] mik-laj commented on a diff in pull request #25302: Dataproc submit job operator async

mik-laj commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r936040468


##########
airflow/providers/google/cloud/hooks/dataproc.py:
##########
@@ -958,3 +973,736 @@ def list_batches(
             metadata=metadata,
         )
         return result
+
+
+class DataprocAsyncHook(GoogleBaseHook):
+    """
+    Asynchronuous Hook for Google Cloud Dataproc APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(gcp_conn_id, delegate_to, impersonation_chain)
+
+    def get_cluster_client(self, region: Optional[str] = None) -> ClusterControllerAsyncClient:
+        """Returns ClusterControllerAsyncClient."""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return ClusterControllerAsyncClient(
+            credentials=self._get_credentials(), client_info=CLIENT_INFO, client_options=client_options
+        )
+
+    def get_template_client(self, region: Optional[str] = None) -> WorkflowTemplateServiceAsyncClient:
+        """Returns WorkflowTemplateServiceAsyncClient."""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return WorkflowTemplateServiceAsyncClient(
+            credentials=self._get_credentials(), client_info=CLIENT_INFO, client_options=client_options
+        )
+
+    def get_job_client(self, region: Optional[str] = None) -> JobControllerAsyncClient:
+        """Returns JobControllerAsyncClient."""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return JobControllerAsyncClient(
+            credentials=self._get_credentials(),
+            client_info=CLIENT_INFO,
+            client_options=client_options,
+        )
+
+    def get_batch_client(self, region: Optional[str] = None) -> BatchControllerAsyncClient:
+        """Returns BatchControllerAsyncClient"""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return BatchControllerAsyncClient(
+            credentials=self._get_credentials(), client_info=CLIENT_INFO, client_options=client_options
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    async def create_cluster(
+        self,
+        region: str,
+        project_id: str,

Review Comment:
   This parameter should be set to None by default. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org