You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/23 20:05:38 UTC

[airflow] branch main updated: Make timeout Optional for wait_for_operation (#20981)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 86ef016  Make timeout Optional for wait_for_operation (#20981)
86ef016 is described below

commit 86ef016eabd90819163503ef07c0da50373142ad
Author: Maksim <ma...@google.com>
AuthorDate: Sun Jan 23 21:05:00 2022 +0100

    Make timeout Optional for wait_for_operation (#20981)
---
 airflow/providers/google/cloud/hooks/dataproc.py     | 2 +-
 airflow/providers/google/cloud/operators/dataproc.py | 4 +---
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py
index 6b3db8d..e80e784 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -277,7 +277,7 @@ class DataprocHook(GoogleBaseHook):
             credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options
         )
 
-    def wait_for_operation(self, timeout: float, operation: Operation):
+    def wait_for_operation(self, operation: Operation, timeout: Optional[float] = None):
         """Waits for long-lasting operation to complete."""
         try:
             return operation.result(timeout=timeout)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index c58ab13..f5112c9 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2101,9 +2101,7 @@ class DataprocCreateBatchOperator(BaseOperator):
                 timeout=self.timeout,
                 metadata=self.metadata,
             )
-            if self.timeout is None:
-                raise AirflowException('Timeout should be set here')
-            result = hook.wait_for_operation(self.timeout, self.operation)
+            result = hook.wait_for_operation(timeout=self.timeout, operation=self.operation)
             self.log.info("Batch %s created", self.batch_id)
         except AlreadyExists:
             self.log.info("Batch with given id already exists")