You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/09/03 21:48:01 UTC
[airflow] branch main updated: Move the try outside the loop when this is possible in Google provider (#33976)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4f20b0f442 Move the try outside the loop when this is possible in Google provider (#33976)
4f20b0f442 is described below
commit 4f20b0f4421f44703f25d5b2970ba065a631d912
Author: Hussein Awala <hu...@awala.fr>
AuthorDate: Sun Sep 3 23:47:54 2023 +0200
Move the try outside the loop when this is possible in Google provider (#33976)
* Move the try outside the loop when this is possible in Google provider
---------
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
.../providers/google/cloud/triggers/bigquery.py | 66 ++++++++++------------
.../google/cloud/triggers/bigquery_dts.py | 19 +++----
.../providers/google/cloud/triggers/cloud_batch.py | 14 ++---
.../providers/google/cloud/triggers/cloud_build.py | 11 ++--
.../providers/google/cloud/triggers/cloud_sql.py | 20 +++----
.../providers/google/cloud/triggers/dataflow.py | 11 ++--
.../providers/google/cloud/triggers/datafusion.py | 11 ++--
.../providers/google/cloud/triggers/dataproc.py | 38 ++++++-------
.../google/cloud/triggers/kubernetes_engine.py | 21 ++++---
.../providers/google/cloud/triggers/mlengine.py | 10 ++--
10 files changed, 103 insertions(+), 118 deletions(-)
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py
index edafddf16e..1f80479d8b 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -75,8 +75,8 @@ class BigQueryInsertJobTrigger(BaseTrigger):
"""Gets current job execution status and yields a TriggerEvent."""
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if job_status == "success":
yield TriggerEvent(
@@ -95,10 +95,9 @@ class BigQueryInsertJobTrigger(BaseTrigger):
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
)
await asyncio.sleep(self.poll_interval)
- except Exception as e:
- self.log.exception("Exception occurred while checking for query completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for query completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
def _get_async_hook(self) -> BigQueryAsyncHook:
return BigQueryAsyncHook(gcp_conn_id=self.conn_id)
@@ -124,8 +123,8 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
# Poll for job execution status
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if job_status == "success":
@@ -160,10 +159,9 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
)
await asyncio.sleep(self.poll_interval)
- except Exception as e:
- self.log.exception("Exception occurred while checking for query completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for query completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
@@ -196,8 +194,8 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent with response data."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
# Poll for job execution status
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if job_status == "success":
@@ -220,10 +218,9 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
)
await asyncio.sleep(self.poll_interval)
- except Exception as e:
- self.log.exception("Exception occurred while checking for query completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for query completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
@@ -302,8 +299,8 @@ class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
first_job_response_from_hook = await hook.get_job_status(
job_id=self.first_job_id, project_id=self.project_id
)
@@ -365,10 +362,9 @@ class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
)
return
- except Exception as e:
- self.log.exception("Exception occurred while checking for query completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for query completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
@@ -430,8 +426,8 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
# Poll for job execution status
response_from_hook = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if response_from_hook == "success":
@@ -448,10 +444,9 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
else:
yield TriggerEvent({"status": "error", "message": response_from_hook, "records": None})
return
- except Exception as e:
- self.log.exception("Exception occurred while checking for query completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for query completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
class BigQueryTableExistenceTrigger(BaseTrigger):
@@ -501,8 +496,8 @@ class BigQueryTableExistenceTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Will run until the table exists in the Google Big Query."""
- while True:
- try:
+ try:
+ while True:
hook = self._get_async_hook()
response = await self._table_exists(
hook=hook, dataset=self.dataset_id, table_id=self.table_id, project_id=self.project_id
@@ -511,10 +506,9 @@ class BigQueryTableExistenceTrigger(BaseTrigger):
yield TriggerEvent({"status": "success", "message": "success"})
return
await asyncio.sleep(self.poll_interval)
- except Exception as e:
- self.log.exception("Exception occurred while checking for Table existence")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for Table existence")
+ yield TriggerEvent({"status": "error", "message": str(e)})
async def _table_exists(
self, hook: BigQueryTableAsyncHook, dataset: str, table_id: str, project_id: str
diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py b/airflow/providers/google/cloud/triggers/bigquery_dts.py
index d5a920a762..16d8ff7b34 100644
--- a/airflow/providers/google/cloud/triggers/bigquery_dts.py
+++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py
@@ -83,8 +83,8 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]:
"""If the Transfer Run is in a terminal state, then yield TriggerEvent object."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
transfer_run: TransferRun = await hook.get_transfer_run(
project_id=self.project_id,
config_id=self.config_id,
@@ -129,14 +129,13 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
self.log.info("Job is still working...")
self.log.info("Waiting for %s seconds", self.poll_interval)
await asyncio.sleep(self.poll_interval)
- except Exception as e:
- yield TriggerEvent(
- {
- "status": "failed",
- "message": f"Trigger failed with exception: {e}",
- }
- )
- return
+ except Exception as e:
+ yield TriggerEvent(
+ {
+ "status": "failed",
+ "message": f"Trigger failed with exception: {e}",
+ }
+ )
def _get_async_hook(self) -> AsyncBiqQueryDataTransferServiceHook:
return AsyncBiqQueryDataTransferServiceHook(
diff --git a/airflow/providers/google/cloud/triggers/cloud_batch.py b/airflow/providers/google/cloud/triggers/cloud_batch.py
index 211e436c95..3ae6211fd3 100644
--- a/airflow/providers/google/cloud/triggers/cloud_batch.py
+++ b/airflow/providers/google/cloud/triggers/cloud_batch.py
@@ -92,9 +92,8 @@ class CloudBatchJobFinishedTrigger(BaseTrigger):
"""
timeout = self.timeout
hook = self._get_async_hook()
- while timeout is None or timeout > 0:
-
- try:
+ try:
+ while timeout is None or timeout > 0:
job: Job = await hook.get_batch_job(job_name=self.job_name)
status: JobStatus.State = job.status.state
@@ -134,10 +133,10 @@ class CloudBatchJobFinishedTrigger(BaseTrigger):
if timeout is None or timeout > 0:
await asyncio.sleep(self.polling_period_seconds)
- except Exception as e:
- self.log.exception("Exception occurred while checking for job completion.")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for job completion.")
+ yield TriggerEvent({"status": "error", "message": str(e)})
+ return
self.log.exception(f"Job with name [{self.job_name}] timed out")
yield TriggerEvent(
@@ -147,7 +146,6 @@ class CloudBatchJobFinishedTrigger(BaseTrigger):
"message": f"Batch job with name {self.job_name} timed out",
}
)
- return
def _get_async_hook(self) -> CloudBatchAsyncHook:
return CloudBatchAsyncHook(
diff --git a/airflow/providers/google/cloud/triggers/cloud_build.py b/airflow/providers/google/cloud/triggers/cloud_build.py
index e07dc93907..dddb9d823a 100644
--- a/airflow/providers/google/cloud/triggers/cloud_build.py
+++ b/airflow/providers/google/cloud/triggers/cloud_build.py
@@ -78,8 +78,8 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current build execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
# Poll for job execution status
cloud_build_instance = await hook.get_cloud_build(
id_=self.id_,
@@ -119,10 +119,9 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
)
return
- except Exception as e:
- self.log.exception("Exception occurred while checking for Cloud Build completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for Cloud Build completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
def _get_async_hook(self) -> CloudBuildAsyncHook:
return CloudBuildAsyncHook(gcp_conn_id=self.gcp_conn_id)
diff --git a/airflow/providers/google/cloud/triggers/cloud_sql.py b/airflow/providers/google/cloud/triggers/cloud_sql.py
index e04ada9277..be1cd739d2 100644
--- a/airflow/providers/google/cloud/triggers/cloud_sql.py
+++ b/airflow/providers/google/cloud/triggers/cloud_sql.py
@@ -64,8 +64,8 @@ class CloudSQLExportTrigger(BaseTrigger):
)
async def run(self):
- while True:
- try:
+ try:
+ while True:
operation = await self.hook.get_operation(
project_id=self.project_id, operation_name=self.operation_name
)
@@ -93,11 +93,11 @@ class CloudSQLExportTrigger(BaseTrigger):
self.poke_interval,
)
await asyncio.sleep(self.poke_interval)
- except Exception as e:
- self.log.exception("Exception occurred while checking operation status.")
- yield TriggerEvent(
- {
- "status": "failed",
- "message": str(e),
- }
- )
+ except Exception as e:
+ self.log.exception("Exception occurred while checking operation status.")
+ yield TriggerEvent(
+ {
+ "status": "failed",
+ "message": str(e),
+ }
+ )
diff --git a/airflow/providers/google/cloud/triggers/dataflow.py b/airflow/providers/google/cloud/triggers/dataflow.py
index 5dfdf5106a..30f42dfdb1 100644
--- a/airflow/providers/google/cloud/triggers/dataflow.py
+++ b/airflow/providers/google/cloud/triggers/dataflow.py
@@ -92,8 +92,8 @@ class TemplateJobStartTrigger(BaseTrigger):
amount of time stored in self.poll_sleep variable.
"""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
status = await hook.get_job_status(
project_id=self.project_id,
job_id=self.job_id,
@@ -129,10 +129,9 @@ class TemplateJobStartTrigger(BaseTrigger):
self.log.info("Current job status is: %s", status)
self.log.info("Sleeping for %s seconds.", self.poll_sleep)
await asyncio.sleep(self.poll_sleep)
- except Exception as e:
- self.log.exception("Exception occurred while checking for job completion.")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for job completion.")
+ yield TriggerEvent({"status": "error", "message": str(e)})
def _get_async_hook(self) -> AsyncDataflowHook:
return AsyncDataflowHook(
diff --git a/airflow/providers/google/cloud/triggers/datafusion.py b/airflow/providers/google/cloud/triggers/datafusion.py
index 66ed139f34..06bf5e053e 100644
--- a/airflow/providers/google/cloud/triggers/datafusion.py
+++ b/airflow/providers/google/cloud/triggers/datafusion.py
@@ -83,8 +83,8 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pipeline status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
# Poll for job execution status
response_from_hook = await hook.get_pipeline_status(
success_states=self.success_states,
@@ -109,10 +109,9 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
else:
yield TriggerEvent({"status": "error", "message": response_from_hook})
return
- except Exception as e:
- self.log.exception("Exception occurred while checking for pipeline state")
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for pipeline state")
+ yield TriggerEvent({"status": "error", "message": str(e)})
def _get_async_hook(self) -> DataFusionAsyncHook:
return DataFusionAsyncHook(
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py
index 3f94c49965..adb34cae03 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -263,8 +263,8 @@ class DataprocDeleteClusterTrigger(DataprocBaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Wait until cluster is deleted completely."""
- while self.end_time > time.time():
- try:
+ try:
+ while self.end_time > time.time():
cluster = await self.get_async_hook().get_cluster(
region=self.region, # type: ignore[arg-type]
cluster_name=self.cluster_name,
@@ -277,13 +277,12 @@ class DataprocDeleteClusterTrigger(DataprocBaseTrigger):
self.polling_interval_seconds,
)
await asyncio.sleep(self.polling_interval_seconds)
- except NotFound:
- yield TriggerEvent({"status": "success", "message": ""})
- return
- except Exception as e:
- yield TriggerEvent({"status": "error", "message": str(e)})
- return
- yield TriggerEvent({"status": "error", "message": "Timeout"})
+ except NotFound:
+ yield TriggerEvent({"status": "success", "message": ""})
+ except Exception as e:
+ yield TriggerEvent({"status": "error", "message": str(e)})
+ else:
+ yield TriggerEvent({"status": "error", "message": "Timeout"})
class DataprocWorkflowTrigger(DataprocBaseTrigger):
@@ -312,8 +311,8 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]:
hook = self.get_async_hook()
- while True:
- try:
+ try:
+ while True:
operation = await hook.get_operation(region=self.region, operation_name=self.name)
if operation.done:
if operation.error.message:
@@ -338,12 +337,11 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
else:
self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
await asyncio.sleep(self.polling_interval_seconds)
- except Exception as e:
- self.log.exception("Exception occurred while checking operation status.")
- yield TriggerEvent(
- {
- "status": "failed",
- "message": str(e),
- }
- )
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking operation status.")
+ yield TriggerEvent(
+ {
+ "status": "failed",
+ "message": str(e),
+ }
+ )
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index d47538a05f..988ccd5558 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -184,8 +184,8 @@ class GKEOperationTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets operation status and yields corresponding event."""
hook = self._get_hook()
- while True:
- try:
+ try:
+ while True:
operation = await hook.get_operation(
operation_name=self.operation_name,
project_id=self.project_id,
@@ -214,15 +214,14 @@ class GKEOperationTrigger(BaseTrigger):
}
)
return
- except Exception as e:
- self.log.exception("Exception occurred while checking operation status")
- yield TriggerEvent(
- {
- "status": "error",
- "message": str(e),
- }
- )
- return
+ except Exception as e:
+ self.log.exception("Exception occurred while checking operation status")
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": str(e),
+ }
+ )
def _get_hook(self) -> GKEAsyncHook:
if self._hook is None:
diff --git a/airflow/providers/google/cloud/triggers/mlengine.py b/airflow/providers/google/cloud/triggers/mlengine.py
index 76c542a5bf..d5c6cd60ca 100644
--- a/airflow/providers/google/cloud/triggers/mlengine.py
+++ b/airflow/providers/google/cloud/triggers/mlengine.py
@@ -91,8 +91,8 @@ class MLEngineStartTrainingJobTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
- while True:
- try:
+ try:
+ while True:
# Poll for job execution status
response_from_hook = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
if response_from_hook == "success":
@@ -110,9 +110,9 @@ class MLEngineStartTrainingJobTrigger(BaseTrigger):
else:
yield TriggerEvent({"status": "error", "message": response_from_hook})
- except Exception as e:
- self.log.exception("Exception occurred while checking for query completion")
- yield TriggerEvent({"status": "error", "message": str(e)})
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for query completion")
+ yield TriggerEvent({"status": "error", "message": str(e)})
def _get_async_hook(self) -> MLEngineAsyncHook:
return MLEngineAsyncHook(