You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/23 20:05:38 UTC
[airflow] branch main updated: Make timeout Optional for wait_for_operation (#20981)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 86ef016 Make timeout Optional for wait_for_operation (#20981)
86ef016 is described below
commit 86ef016eabd90819163503ef07c0da50373142ad
Author: Maksim <ma...@google.com>
AuthorDate: Sun Jan 23 21:05:00 2022 +0100
Make timeout Optional for wait_for_operation (#20981)
---
airflow/providers/google/cloud/hooks/dataproc.py | 2 +-
airflow/providers/google/cloud/operators/dataproc.py | 4 +---
2 files changed, 2 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py
index 6b3db8d..e80e784 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -277,7 +277,7 @@ class DataprocHook(GoogleBaseHook):
credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options
)
- def wait_for_operation(self, timeout: float, operation: Operation):
+ def wait_for_operation(self, operation: Operation, timeout: Optional[float] = None):
"""Waits for long-lasting operation to complete."""
try:
return operation.result(timeout=timeout)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index c58ab13..f5112c9 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2101,9 +2101,7 @@ class DataprocCreateBatchOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
- if self.timeout is None:
- raise AirflowException('Timeout should be set here')
- result = hook.wait_for_operation(self.timeout, self.operation)
+ result = hook.wait_for_operation(timeout=self.timeout, operation=self.operation)
self.log.info("Batch %s created", self.batch_id)
except AlreadyExists:
self.log.info("Batch with given id already exists")