You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/09/03 21:48:01 UTC

[airflow] branch main updated: Move the try outside the loop when this is possible in Google provider (#33976)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f20b0f442 Move the try outside the loop when this is possible in Google provider (#33976)
4f20b0f442 is described below

commit 4f20b0f4421f44703f25d5b2970ba065a631d912
Author: Hussein Awala <hu...@awala.fr>
AuthorDate: Sun Sep 3 23:47:54 2023 +0200

    Move the try outside the loop when this is possible in Google provider (#33976)
    
    * Move the try outside the loop when this is possible in Google provider
    
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 .../providers/google/cloud/triggers/bigquery.py    | 66 ++++++++++------------
 .../google/cloud/triggers/bigquery_dts.py          | 19 +++----
 .../providers/google/cloud/triggers/cloud_batch.py | 14 ++---
 .../providers/google/cloud/triggers/cloud_build.py | 11 ++--
 .../providers/google/cloud/triggers/cloud_sql.py   | 20 +++----
 .../providers/google/cloud/triggers/dataflow.py    | 11 ++--
 .../providers/google/cloud/triggers/datafusion.py  | 11 ++--
 .../providers/google/cloud/triggers/dataproc.py    | 38 ++++++-------
 .../google/cloud/triggers/kubernetes_engine.py     | 21 ++++---
 .../providers/google/cloud/triggers/mlengine.py    | 10 ++--
 10 files changed, 103 insertions(+), 118 deletions(-)

diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py
index edafddf16e..1f80479d8b 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -75,8 +75,8 @@ class BigQueryInsertJobTrigger(BaseTrigger):
         """Gets current job execution status and yields a TriggerEvent."""
         """Gets current job execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
                 if job_status == "success":
                     yield TriggerEvent(
@@ -95,10 +95,9 @@ class BigQueryInsertJobTrigger(BaseTrigger):
                         "Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
                     )
                     await asyncio.sleep(self.poll_interval)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for query completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for query completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
     def _get_async_hook(self) -> BigQueryAsyncHook:
         return BigQueryAsyncHook(gcp_conn_id=self.conn_id)
@@ -124,8 +123,8 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current job execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 # Poll for job execution status
                 job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
                 if job_status == "success":
@@ -160,10 +159,9 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
                         "Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
                     )
                     await asyncio.sleep(self.poll_interval)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for query completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for query completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
 
 class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
@@ -196,8 +194,8 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current job execution status and yields a TriggerEvent with response data."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 # Poll for job execution status
                 job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
                 if job_status == "success":
@@ -220,10 +218,9 @@ class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
                         "Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
                     )
                     await asyncio.sleep(self.poll_interval)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for query completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for query completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
 
 class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
@@ -302,8 +299,8 @@ class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current job execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 first_job_response_from_hook = await hook.get_job_status(
                     job_id=self.first_job_id, project_id=self.project_id
                 )
@@ -365,10 +362,9 @@ class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
                     )
                     return
 
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for query completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for query completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
 
 class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
@@ -430,8 +426,8 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current job execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 # Poll for job execution status
                 response_from_hook = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
                 if response_from_hook == "success":
@@ -448,10 +444,9 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
                 else:
                     yield TriggerEvent({"status": "error", "message": response_from_hook, "records": None})
                     return
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for query completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for query completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
 
 class BigQueryTableExistenceTrigger(BaseTrigger):
@@ -501,8 +496,8 @@ class BigQueryTableExistenceTrigger(BaseTrigger):
 
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Will run until the table exists in the Google Big Query."""
-        while True:
-            try:
+        try:
+            while True:
                 hook = self._get_async_hook()
                 response = await self._table_exists(
                     hook=hook, dataset=self.dataset_id, table_id=self.table_id, project_id=self.project_id
@@ -511,10 +506,9 @@ class BigQueryTableExistenceTrigger(BaseTrigger):
                     yield TriggerEvent({"status": "success", "message": "success"})
                     return
                 await asyncio.sleep(self.poll_interval)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for Table existence")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for Table existence")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
     async def _table_exists(
         self, hook: BigQueryTableAsyncHook, dataset: str, table_id: str, project_id: str
diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py b/airflow/providers/google/cloud/triggers/bigquery_dts.py
index d5a920a762..16d8ff7b34 100644
--- a/airflow/providers/google/cloud/triggers/bigquery_dts.py
+++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py
@@ -83,8 +83,8 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """If the Transfer Run is in a terminal state, then yield TriggerEvent object."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 transfer_run: TransferRun = await hook.get_transfer_run(
                     project_id=self.project_id,
                     config_id=self.config_id,
@@ -129,14 +129,13 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
                     self.log.info("Job is still working...")
                     self.log.info("Waiting for %s seconds", self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
-            except Exception as e:
-                yield TriggerEvent(
-                    {
-                        "status": "failed",
-                        "message": f"Trigger failed with exception: {e}",
-                    }
-                )
-                return
+        except Exception as e:
+            yield TriggerEvent(
+                {
+                    "status": "failed",
+                    "message": f"Trigger failed with exception: {e}",
+                }
+            )
 
     def _get_async_hook(self) -> AsyncBiqQueryDataTransferServiceHook:
         return AsyncBiqQueryDataTransferServiceHook(
diff --git a/airflow/providers/google/cloud/triggers/cloud_batch.py b/airflow/providers/google/cloud/triggers/cloud_batch.py
index 211e436c95..3ae6211fd3 100644
--- a/airflow/providers/google/cloud/triggers/cloud_batch.py
+++ b/airflow/providers/google/cloud/triggers/cloud_batch.py
@@ -92,9 +92,8 @@ class CloudBatchJobFinishedTrigger(BaseTrigger):
         """
         timeout = self.timeout
         hook = self._get_async_hook()
-        while timeout is None or timeout > 0:
-
-            try:
+        try:
+            while timeout is None or timeout > 0:
                 job: Job = await hook.get_batch_job(job_name=self.job_name)
 
                 status: JobStatus.State = job.status.state
@@ -134,10 +133,10 @@ class CloudBatchJobFinishedTrigger(BaseTrigger):
                     if timeout is None or timeout > 0:
                         await asyncio.sleep(self.polling_period_seconds)
 
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for job completion.")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for job completion.")
+            yield TriggerEvent({"status": "error", "message": str(e)})
+            return
 
         self.log.exception(f"Job with name [{self.job_name}] timed out")
         yield TriggerEvent(
@@ -147,7 +146,6 @@ class CloudBatchJobFinishedTrigger(BaseTrigger):
                 "message": f"Batch job with name {self.job_name} timed out",
             }
         )
-        return
 
     def _get_async_hook(self) -> CloudBatchAsyncHook:
         return CloudBatchAsyncHook(
diff --git a/airflow/providers/google/cloud/triggers/cloud_build.py b/airflow/providers/google/cloud/triggers/cloud_build.py
index e07dc93907..dddb9d823a 100644
--- a/airflow/providers/google/cloud/triggers/cloud_build.py
+++ b/airflow/providers/google/cloud/triggers/cloud_build.py
@@ -78,8 +78,8 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current build execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 # Poll for job execution status
                 cloud_build_instance = await hook.get_cloud_build(
                     id_=self.id_,
@@ -119,10 +119,9 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
                     )
                     return
 
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for Cloud Build completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for Cloud Build completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
     def _get_async_hook(self) -> CloudBuildAsyncHook:
         return CloudBuildAsyncHook(gcp_conn_id=self.gcp_conn_id)
diff --git a/airflow/providers/google/cloud/triggers/cloud_sql.py b/airflow/providers/google/cloud/triggers/cloud_sql.py
index e04ada9277..be1cd739d2 100644
--- a/airflow/providers/google/cloud/triggers/cloud_sql.py
+++ b/airflow/providers/google/cloud/triggers/cloud_sql.py
@@ -64,8 +64,8 @@ class CloudSQLExportTrigger(BaseTrigger):
         )
 
     async def run(self):
-        while True:
-            try:
+        try:
+            while True:
                 operation = await self.hook.get_operation(
                     project_id=self.project_id, operation_name=self.operation_name
                 )
@@ -93,11 +93,11 @@ class CloudSQLExportTrigger(BaseTrigger):
                         self.poke_interval,
                     )
                     await asyncio.sleep(self.poke_interval)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking operation status.")
-                yield TriggerEvent(
-                    {
-                        "status": "failed",
-                        "message": str(e),
-                    }
-                )
+        except Exception as e:
+            self.log.exception("Exception occurred while checking operation status.")
+            yield TriggerEvent(
+                {
+                    "status": "failed",
+                    "message": str(e),
+                }
+            )
diff --git a/airflow/providers/google/cloud/triggers/dataflow.py b/airflow/providers/google/cloud/triggers/dataflow.py
index 5dfdf5106a..30f42dfdb1 100644
--- a/airflow/providers/google/cloud/triggers/dataflow.py
+++ b/airflow/providers/google/cloud/triggers/dataflow.py
@@ -92,8 +92,8 @@ class TemplateJobStartTrigger(BaseTrigger):
         amount of time stored in self.poll_sleep variable.
         """
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 status = await hook.get_job_status(
                     project_id=self.project_id,
                     job_id=self.job_id,
@@ -129,10 +129,9 @@ class TemplateJobStartTrigger(BaseTrigger):
                     self.log.info("Current job status is: %s", status)
                     self.log.info("Sleeping for %s seconds.", self.poll_sleep)
                     await asyncio.sleep(self.poll_sleep)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for job completion.")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for job completion.")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
     def _get_async_hook(self) -> AsyncDataflowHook:
         return AsyncDataflowHook(
diff --git a/airflow/providers/google/cloud/triggers/datafusion.py b/airflow/providers/google/cloud/triggers/datafusion.py
index 66ed139f34..06bf5e053e 100644
--- a/airflow/providers/google/cloud/triggers/datafusion.py
+++ b/airflow/providers/google/cloud/triggers/datafusion.py
@@ -83,8 +83,8 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current pipeline status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 # Poll for job execution status
                 response_from_hook = await hook.get_pipeline_status(
                     success_states=self.success_states,
@@ -109,10 +109,9 @@ class DataFusionStartPipelineTrigger(BaseTrigger):
                 else:
                     yield TriggerEvent({"status": "error", "message": response_from_hook})
                     return
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for pipeline state")
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for pipeline state")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
     def _get_async_hook(self) -> DataFusionAsyncHook:
         return DataFusionAsyncHook(
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py
index 3f94c49965..adb34cae03 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -263,8 +263,8 @@ class DataprocDeleteClusterTrigger(DataprocBaseTrigger):
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Wait until cluster is deleted completely."""
-        while self.end_time > time.time():
-            try:
+        try:
+            while self.end_time > time.time():
                 cluster = await self.get_async_hook().get_cluster(
                     region=self.region,  # type: ignore[arg-type]
                     cluster_name=self.cluster_name,
@@ -277,13 +277,12 @@ class DataprocDeleteClusterTrigger(DataprocBaseTrigger):
                     self.polling_interval_seconds,
                 )
                 await asyncio.sleep(self.polling_interval_seconds)
-            except NotFound:
-                yield TriggerEvent({"status": "success", "message": ""})
-                return
-            except Exception as e:
-                yield TriggerEvent({"status": "error", "message": str(e)})
-                return
-        yield TriggerEvent({"status": "error", "message": "Timeout"})
+        except NotFound:
+            yield TriggerEvent({"status": "success", "message": ""})
+        except Exception as e:
+            yield TriggerEvent({"status": "error", "message": str(e)})
+        else:
+            yield TriggerEvent({"status": "error", "message": "Timeout"})
 
 
 class DataprocWorkflowTrigger(DataprocBaseTrigger):
@@ -312,8 +311,8 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         hook = self.get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 operation = await hook.get_operation(region=self.region, operation_name=self.name)
                 if operation.done:
                     if operation.error.message:
@@ -338,12 +337,11 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
                 else:
                     self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
                     await asyncio.sleep(self.polling_interval_seconds)
-            except Exception as e:
-                self.log.exception("Exception occurred while checking operation status.")
-                yield TriggerEvent(
-                    {
-                        "status": "failed",
-                        "message": str(e),
-                    }
-                )
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking operation status.")
+            yield TriggerEvent(
+                {
+                    "status": "failed",
+                    "message": str(e),
+                }
+            )
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index d47538a05f..988ccd5558 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -184,8 +184,8 @@ class GKEOperationTrigger(BaseTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets operation status and yields corresponding event."""
         hook = self._get_hook()
-        while True:
-            try:
+        try:
+            while True:
                 operation = await hook.get_operation(
                     operation_name=self.operation_name,
                     project_id=self.project_id,
@@ -214,15 +214,14 @@ class GKEOperationTrigger(BaseTrigger):
                         }
                     )
                     return
-            except Exception as e:
-                self.log.exception("Exception occurred while checking operation status")
-                yield TriggerEvent(
-                    {
-                        "status": "error",
-                        "message": str(e),
-                    }
-                )
-                return
+        except Exception as e:
+            self.log.exception("Exception occurred while checking operation status")
+            yield TriggerEvent(
+                {
+                    "status": "error",
+                    "message": str(e),
+                }
+            )
 
     def _get_hook(self) -> GKEAsyncHook:
         if self._hook is None:
diff --git a/airflow/providers/google/cloud/triggers/mlengine.py b/airflow/providers/google/cloud/triggers/mlengine.py
index 76c542a5bf..d5c6cd60ca 100644
--- a/airflow/providers/google/cloud/triggers/mlengine.py
+++ b/airflow/providers/google/cloud/triggers/mlengine.py
@@ -91,8 +91,8 @@ class MLEngineStartTrainingJobTrigger(BaseTrigger):
     async def run(self) -> AsyncIterator[TriggerEvent]:  # type: ignore[override]
         """Gets current job execution status and yields a TriggerEvent."""
         hook = self._get_async_hook()
-        while True:
-            try:
+        try:
+            while True:
                 # Poll for job execution status
                 response_from_hook = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
                 if response_from_hook == "success":
@@ -110,9 +110,9 @@ class MLEngineStartTrainingJobTrigger(BaseTrigger):
                 else:
                     yield TriggerEvent({"status": "error", "message": response_from_hook})
 
-            except Exception as e:
-                self.log.exception("Exception occurred while checking for query completion")
-                yield TriggerEvent({"status": "error", "message": str(e)})
+        except Exception as e:
+            self.log.exception("Exception occurred while checking for query completion")
+            yield TriggerEvent({"status": "error", "message": str(e)})
 
     def _get_async_hook(self) -> MLEngineAsyncHook:
         return MLEngineAsyncHook(