You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/12 10:24:43 UTC

[airflow] branch master updated: Google cloud operator strict type check (#11450)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 06141d6  Google cloud operator strict type check (#11450)
06141d6 is described below

commit 06141d6d01398115e5e54c5766a46ae5514ba2f7
Author: Satyasheel <ml...@users.noreply.github.com>
AuthorDate: Mon Oct 12 11:23:35 2020 +0100

    Google cloud operator strict type check (#11450)
    
    import optimisation
---
 .../providers/google/cloud/operators/bigquery.py   | 31 +++++----
 .../google/cloud/operators/bigquery_dts.py         |  8 +--
 .../providers/google/cloud/operators/bigtable.py   | 14 ++--
 .../google/cloud/operators/cloud_build.py          | 14 ++--
 .../google/cloud/operators/cloud_memorystore.py    | 22 +++----
 .../providers/google/cloud/operators/cloud_sql.py  | 48 +++++++-------
 .../operators/cloud_storage_transfer_service.py    | 74 +++++++++++-----------
 .../providers/google/cloud/operators/compute.py    | 18 +++---
 .../google/cloud/operators/datacatalog.py          | 42 ++++++------
 .../providers/google/cloud/operators/dataflow.py   | 10 +--
 .../providers/google/cloud/operators/datafusion.py | 20 +++---
 .../providers/google/cloud/operators/dataprep.py   |  8 +--
 .../providers/google/cloud/operators/dataproc.py   | 16 ++---
 .../providers/google/cloud/operators/datastore.py  | 14 ++--
 airflow/providers/google/cloud/operators/dlp.py    | 10 +--
 .../providers/google/cloud/operators/functions.py  | 24 +++----
 airflow/providers/google/cloud/operators/gcs.py    | 14 ++--
 .../google/cloud/operators/kubernetes_engine.py    | 10 +--
 .../google/cloud/operators/life_sciences.py        |  4 +-
 airflow/providers/google/cloud/operators/pubsub.py | 14 ++--
 .../providers/google/cloud/operators/spanner.py    | 24 +++----
 .../google/cloud/operators/speech_to_text.py       |  2 +-
 .../google/cloud/operators/stackdriver.py          | 20 +++---
 .../google/cloud/operators/text_to_speech.py       |  4 +-
 .../providers/google/cloud/operators/translate.py  |  2 +-
 .../google/cloud/operators/translate_speech.py     |  2 +-
 26 files changed, 237 insertions(+), 232 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 93e7db0..0686b33 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -26,6 +26,7 @@ import json
 import re
 import uuid
 import warnings
+from datetime import datetime
 from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, SupportsAbs, Union
 
 import attr
@@ -81,7 +82,7 @@ class BigQueryConsoleIndexableLink(BaseOperatorLink):
     def name(self) -> str:
         return f'BigQuery Console #{self.index + 1}'
 
-    def get_link(self, operator, dttm):
+    def get_link(self, operator: BaseOperator, dttm: datetime):
         ti = TaskInstance(task=operator, execution_date=dttm)
         job_ids = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
         if not job_ids:
@@ -466,7 +467,7 @@ class BigQueryGetDataOperator(BaseOperator):
         self.location = location
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> list:
         self.log.info(
             'Fetching Data from %s.%s max results: %s', self.dataset_id, self.table_id, self.max_results
         )
@@ -741,7 +742,7 @@ class BigQueryExecuteQueryOperator(BaseOperator):
             )
         context['task_instance'].xcom_push(key='job_id', value=job_id)
 
-    def on_kill(self):
+    def on_kill(self) -> None:
         super().on_kill()
         if self.hook is not None:
             self.log.info('Cancelling running query')
@@ -931,7 +932,7 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
         self.table_resource = table_resource
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         bq_hook = BigQueryHook(
             gcp_conn_id=self.bigquery_conn_id,
             delegate_to=self.delegate_to,
@@ -946,7 +947,9 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
                 delegate_to=self.delegate_to,
                 impersonation_chain=self.impersonation_chain,
             )
-            schema_fields = json.loads(gcs_hook.download(gcs_bucket, gcs_object).decode("utf-8"))
+            schema_fields = json.loads(
+                gcs_hook.download(gcs_bucket, gcs_object).decode("utf-8")  # type: ignore[attr-defined]
+            )  # type: ignore[attr-defined]
         else:
             schema_fields = self.schema_fields
 
@@ -1172,7 +1175,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
         self.location = location
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         bq_hook = BigQueryHook(
             gcp_conn_id=self.bigquery_conn_id,
             delegate_to=self.delegate_to,
@@ -1187,7 +1190,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
                 impersonation_chain=self.impersonation_chain,
             )
             schema_object = gcs_hook.download(self.bucket, self.schema_object)
-            schema_fields = json.loads(schema_object.decode("utf-8"))
+            schema_fields = json.loads(schema_object.decode("utf-8"))  # type: ignore[attr-defined]
         else:
             schema_fields = self.schema_fields
 
@@ -1309,7 +1312,7 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
 
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self.log.info('Dataset id: %s Project id: %s', self.dataset_id, self.project_id)
 
         bq_hook = BigQueryHook(
@@ -1413,7 +1416,7 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
 
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         bq_hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -1828,7 +1831,7 @@ class BigQueryDeleteTableOperator(BaseOperator):
         self.location = location
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self.log.info('Deleting: %s', self.deletion_dataset_table)
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -1919,7 +1922,7 @@ class BigQueryUpsertTableOperator(BaseOperator):
         self.location = location
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self.log.info('Upserting Dataset: %s with table_resource: %s', self.dataset_id, self.table_resource)
         hook = BigQueryHook(
             bigquery_conn_id=self.gcp_conn_id,
@@ -2107,6 +2110,8 @@ class BigQueryInsertJobOperator(BaseOperator):
         self.job_id = job.job_id
         return job.job_id
 
-    def on_kill(self):
+    def on_kill(self) -> None:
         if self.job_id and self.cancel_on_kill:
-            self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, location=self.location)
+            self.hook.cancel_job(  # type: ignore[union-attr]
+                job_id=self.job_id, project_id=self.project_id, location=self.location
+            )
diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py b/airflow/providers/google/cloud/operators/bigquery_dts.py
index 6bdba6e..1c401fc 100644
--- a/airflow/providers/google/cloud/operators/bigquery_dts.py
+++ b/airflow/providers/google/cloud/operators/bigquery_dts.py
@@ -88,7 +88,7 @@ class BigQueryCreateDataTransferOperator(BaseOperator):
         gcp_conn_id="google_cloud_default",
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.transfer_config = transfer_config
         self.authorization_code = authorization_code
@@ -172,7 +172,7 @@ class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
         gcp_conn_id="google_cloud_default",
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.project_id = project_id
         self.transfer_config_id = transfer_config_id
@@ -182,7 +182,7 @@ class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BiqQueryDataTransferServiceHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -265,7 +265,7 @@ class BigQueryDataTransferServiceStartTransferRunsOperator(BaseOperator):
         gcp_conn_id="google_cloud_default",
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.project_id = project_id
         self.transfer_config_id = transfer_config_id
diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py
index f47f6be..ab9cf2e 100644
--- a/airflow/providers/google/cloud/operators/bigtable.py
+++ b/airflow/providers/google/cloud/operators/bigtable.py
@@ -153,7 +153,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BigtableHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -259,7 +259,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BigtableHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -335,7 +335,7 @@ class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BigtableHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -423,7 +423,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _compare_column_families(self, hook, instance):
+    def _compare_column_families(self, hook, instance) -> bool:
         table_column_families = hook.get_column_families_for_table(instance, self.table_id)
         if set(table_column_families.keys()) != set(self.column_families.keys()):
             self.log.error("Table '%s' has different set of Column Families", self.table_id)
@@ -444,7 +444,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
                 return False
         return True
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BigtableHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -533,7 +533,7 @@ class BigtableDeleteTableOperator(BaseOperator, BigtableValidationMixin):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BigtableHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -619,7 +619,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = BigtableHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py b/airflow/providers/google/cloud/operators/cloud_build.py
index 5d40042..5126e35 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -46,10 +46,10 @@ class BuildProcessor:
     :type body: dict
     """
 
-    def __init__(self, body: Dict) -> None:
+    def __init__(self, body: dict) -> None:
         self.body = deepcopy(body)
 
-    def _verify_source(self):
+    def _verify_source(self) -> None:
         is_storage = "storageSource" in self.body["source"]
         is_repo = "repoSource" in self.body["source"]
 
@@ -61,11 +61,11 @@ class BuildProcessor:
                 "storageSource and repoSource."
             )
 
-    def _reformat_source(self):
+    def _reformat_source(self) -> None:
         self._reformat_repo_source()
         self._reformat_storage_source()
 
-    def _reformat_repo_source(self):
+    def _reformat_repo_source(self) -> None:
         if "repoSource" not in self.body["source"]:
             return
 
@@ -76,7 +76,7 @@ class BuildProcessor:
 
         self.body["source"]["repoSource"] = self._convert_repo_url_to_dict(source)
 
-    def _reformat_storage_source(self):
+    def _reformat_storage_source(self) -> None:
         if "storageSource" not in self.body["source"]:
             return
 
@@ -87,7 +87,7 @@ class BuildProcessor:
 
         self.body["source"]["storageSource"] = self._convert_storage_url_to_dict(source)
 
-    def process_body(self):
+    def process_body(self) -> dict:
         """
         Processes the body passed in the constructor
 
@@ -228,7 +228,7 @@ class CloudBuildCreateBuildOperator(BaseOperator):
             if self.body_raw.endswith('.json'):
                 self.body = json.loads(file.read())
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.body:
             raise AirflowException("The required parameter 'body' is missing")
 
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index b151ea1..0600914 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -117,7 +117,7 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -204,7 +204,7 @@ class CloudMemorystoreDeleteInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -298,7 +298,7 @@ class CloudMemorystoreExportInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -391,7 +391,7 @@ class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -476,7 +476,7 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -572,7 +572,7 @@ class CloudMemorystoreImportOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -661,7 +661,7 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -771,7 +771,7 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -863,7 +863,7 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -978,7 +978,7 @@ class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1085,7 +1085,7 @@ class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
diff --git a/airflow/providers/google/cloud/operators/cloud_sql.py b/airflow/providers/google/cloud/operators/cloud_sql.py
index b3cf24e..162264d 100644
--- a/airflow/providers/google/cloud/operators/cloud_sql.py
+++ b/airflow/providers/google/cloud/operators/cloud_sql.py
@@ -232,13 +232,13 @@ class CloudSQLBaseOperator(BaseOperator):
         self._validate_inputs()
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance:
             raise AirflowException("The required parameter 'instance' is empty or None")
 
-    def _check_if_instance_exists(self, instance, hook: CloudSQLHook):
+    def _check_if_instance_exists(self, instance, hook: CloudSQLHook) -> Union[dict, bool]:
         try:
             return hook.get_instance(project_id=self.project_id, instance=instance)
         except HttpError as e:
@@ -247,7 +247,7 @@ class CloudSQLBaseOperator(BaseOperator):
                 return False
             raise e
 
-    def _check_if_db_exists(self, db_name, hook: CloudSQLHook):
+    def _check_if_db_exists(self, db_name, hook: CloudSQLHook) -> Union[dict, bool]:
         try:
             return hook.get_database(project_id=self.project_id, instance=self.instance, database=db_name)
         except HttpError as e:
@@ -335,18 +335,18 @@ class CloudSQLCreateInstanceOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.body:
             raise AirflowException("The required parameter 'body' is empty")
 
-    def _validate_body_fields(self):
+    def _validate_body_fields(self) -> None:
         if self.validate_body:
             GcpBodyFieldValidator(CLOUD_SQL_CREATE_VALIDATION, api_version=self.api_version).validate(
                 self.body
             )
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudSQLHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -435,7 +435,7 @@ class CloudSQLInstancePatchOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.body:
             raise AirflowException("The required parameter 'body' is empty")
@@ -513,7 +513,7 @@ class CloudSQLDeleteInstanceOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[bool]:
         hook = CloudSQLHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -594,18 +594,18 @@ class CloudSQLCreateInstanceDatabaseOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.body:
             raise AirflowException("The required parameter 'body' is empty")
 
-    def _validate_body_fields(self):
+    def _validate_body_fields(self) -> None:
         if self.validate_body:
             GcpBodyFieldValidator(
                 CLOUD_SQL_DATABASE_CREATE_VALIDATION, api_version=self.api_version
             ).validate(self.body)
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[bool]:
         self._validate_body_fields()
         database = self.body.get("name")
         if not database:
@@ -705,20 +705,20 @@ class CloudSQLPatchInstanceDatabaseOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.body:
             raise AirflowException("The required parameter 'body' is empty")
         if not self.database:
             raise AirflowException("The required parameter 'database' is empty")
 
-    def _validate_body_fields(self):
+    def _validate_body_fields(self) -> None:
         if self.validate_body:
             GcpBodyFieldValidator(CLOUD_SQL_DATABASE_PATCH_VALIDATION, api_version=self.api_version).validate(
                 self.body
             )
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self._validate_body_fields()
         hook = CloudSQLHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -802,12 +802,12 @@ class CloudSQLDeleteInstanceDatabaseOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.database:
             raise AirflowException("The required parameter 'database' is empty")
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[bool]:
         hook = CloudSQLHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -897,18 +897,18 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.body:
             raise AirflowException("The required parameter 'body' is empty")
 
-    def _validate_body_fields(self):
+    def _validate_body_fields(self) -> None:
         if self.validate_body:
             GcpBodyFieldValidator(CLOUD_SQL_EXPORT_VALIDATION, api_version=self.api_version).validate(
                 self.body
             )
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self._validate_body_fields()
         hook = CloudSQLHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -1002,18 +1002,18 @@ class CloudSQLImportInstanceOperator(CloudSQLBaseOperator):
             **kwargs,
         )
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         super()._validate_inputs()
         if not self.body:
             raise AirflowException("The required parameter 'body' is empty")
 
-    def _validate_body_fields(self):
+    def _validate_body_fields(self) -> None:
         if self.validate_body:
             GcpBodyFieldValidator(CLOUD_SQL_IMPORT_VALIDATION, api_version=self.api_version).validate(
                 self.body
             )
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self._validate_body_fields()
         hook = CloudSQLHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -1077,7 +1077,9 @@ class CloudSQLExecuteQueryOperator(BaseOperator):
         self.parameters = parameters
         self.gcp_connection = None
 
-    def _execute_query(self, hook: CloudSQLDatabaseHook, database_hook: Union[PostgresHook, MySqlHook]):
+    def _execute_query(
+        self, hook: CloudSQLDatabaseHook, database_hook: Union[PostgresHook, MySqlHook]
+    ) -> None:
         cloud_sql_proxy_runner = None
         try:
             if hook.use_proxy:
diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
index dc70431..536ebb1 100644
--- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
@@ -21,7 +21,7 @@ This module contains Google Cloud Transfer operators.
 """
 from copy import deepcopy
 from datetime import date, time
-from typing import Dict, Optional, Sequence, Union
+from typing import Dict, Optional, Sequence, Union, List
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -67,32 +67,32 @@ class TransferJobPreprocessor:
         self.aws_conn_id = aws_conn_id
         self.default_schedule = default_schedule
 
-    def _inject_aws_credentials(self):
+    def _inject_aws_credentials(self) -> None:
         if TRANSFER_SPEC in self.body and AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]:
             aws_hook = AwsBaseHook(self.aws_conn_id, resource_type="s3")
             aws_credentials = aws_hook.get_credentials()
-            aws_access_key_id = aws_credentials.access_key
-            aws_secret_access_key = aws_credentials.secret_key
+            aws_access_key_id = aws_credentials.access_key  # type: ignore[attr-defined]
+            aws_secret_access_key = aws_credentials.secret_key  # type: ignore[attr-defined]
             self.body[TRANSFER_SPEC][AWS_S3_DATA_SOURCE][AWS_ACCESS_KEY] = {
                 ACCESS_KEY_ID: aws_access_key_id,
                 SECRET_ACCESS_KEY: aws_secret_access_key,
             }
 
-    def _reformat_date(self, field_key):
+    def _reformat_date(self, field_key: str) -> None:
         schedule = self.body[SCHEDULE]
         if field_key not in schedule:
             return
         if isinstance(schedule[field_key], date):
             schedule[field_key] = self._convert_date_to_dict(schedule[field_key])
 
-    def _reformat_time(self, field_key):
+    def _reformat_time(self, field_key: str) -> None:
         schedule = self.body[SCHEDULE]
         if field_key not in schedule:
             return
         if isinstance(schedule[field_key], time):
             schedule[field_key] = self._convert_time_to_dict(schedule[field_key])
 
-    def _reformat_schedule(self):
+    def _reformat_schedule(self) -> None:
         if SCHEDULE not in self.body:
             if self.default_schedule:
                 self.body[SCHEDULE] = {SCHEDULE_START_DATE: date.today(), SCHEDULE_END_DATE: date.today()}
@@ -102,7 +102,7 @@ class TransferJobPreprocessor:
         self._reformat_date(SCHEDULE_END_DATE)
         self._reformat_time(START_TIME_OF_DAY)
 
-    def process_body(self):
+    def process_body(self) -> dict:
         """
         Injects AWS credentials into body if needed and
         reformats schedule information.
@@ -115,14 +115,14 @@ class TransferJobPreprocessor:
         return self.body
 
     @staticmethod
-    def _convert_date_to_dict(field_date):
+    def _convert_date_to_dict(field_date: date) -> dict:
         """
         Convert native python ``datetime.date`` object  to a format supported by the API
         """
         return {DAY: field_date.day, MONTH: field_date.month, YEAR: field_date.year}
 
     @staticmethod
-    def _convert_time_to_dict(time_object):
+    def _convert_time_to_dict(time_object: time) -> dict:
         """
         Convert native python ``datetime.time`` object  to a format supported by the API
         """
@@ -140,7 +140,7 @@ class TransferJobValidator:
 
         self.body = body
 
-    def _verify_data_source(self):
+    def _verify_data_source(self) -> None:
         is_gcs = GCS_DATA_SOURCE in self.body[TRANSFER_SPEC]
         is_aws_s3 = AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]
         is_http = HTTP_DATA_SOURCE in self.body[TRANSFER_SPEC]
@@ -152,7 +152,7 @@ class TransferJobValidator:
                 "gcsDataSource, awsS3DataSource and httpDataSource."
             )
 
-    def _restrict_aws_credentials(self):
+    def _restrict_aws_credentials(self) -> None:
         aws_transfer = AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]
         if aws_transfer and AWS_ACCESS_KEY in self.body[TRANSFER_SPEC][AWS_S3_DATA_SOURCE]:
             raise AirflowException(
@@ -160,7 +160,7 @@ class TransferJobValidator:
                 "please use Airflow connections to store credentials."
             )
 
-    def validate_body(self):
+    def validate_body(self) -> None:
         """
         Validates the body. Checks if body specifies `transferSpec`
         if yes, then check if AWS credentials are passed correctly and
@@ -247,10 +247,10 @@ class CloudDataTransferServiceCreateJobOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         TransferJobValidator(body=self.body).validate_body()
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
@@ -329,12 +329,12 @@ class CloudDataTransferServiceUpdateJobOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         TransferJobValidator(body=self.body).validate_body()
         if not self.job_name:
             raise AirflowException("The required parameter 'job_name' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
@@ -405,11 +405,11 @@ class CloudDataTransferServiceDeleteJobOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.job_name:
             raise AirflowException("The required parameter 'job_name' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         self._validate_inputs()
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
@@ -471,11 +471,11 @@ class CloudDataTransferServiceGetOperationOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.operation_name:
             raise AirflowException("The required parameter 'operation_name' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
             gcp_conn_id=self.gcp_conn_id,
@@ -545,11 +545,11 @@ class CloudDataTransferServiceListOperationsOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.filter:
             raise AirflowException("The required parameter 'filter' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> List[dict]:
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
             gcp_conn_id=self.gcp_conn_id,
@@ -611,11 +611,11 @@ class CloudDataTransferServicePauseOperationOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.operation_name:
             raise AirflowException("The required parameter 'operation_name' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
             gcp_conn_id=self.gcp_conn_id,
@@ -675,11 +675,11 @@ class CloudDataTransferServiceResumeOperationOperator(BaseOperator):
         self._validate_inputs()
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.operation_name:
             raise AirflowException("The required parameter 'operation_name' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
             gcp_conn_id=self.gcp_conn_id,
@@ -740,11 +740,11 @@ class CloudDataTransferServiceCancelOperationOperator(BaseOperator):
         self.google_impersonation_chain = google_impersonation_chain
         self._validate_inputs()
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.operation_name:
             raise AirflowException("The required parameter 'operation_name' is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDataTransferServiceHook(
             api_version=self.api_version,
             gcp_conn_id=self.gcp_conn_id,
@@ -868,7 +868,7 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
         self.timeout = timeout
         self.google_impersonation_chain = google_impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDataTransferServiceHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -883,7 +883,7 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
         if self.wait:
             hook.wait_for_transfer_job(job, timeout=self.timeout)
 
-    def _create_body(self):
+    def _create_body(self) -> dict:
         body = {
             DESCRIPTION: self.description,
             STATUS: GcpTransferJobsStatus.ENABLED,
@@ -900,10 +900,10 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
             body[SCHEDULE] = self.schedule
 
         if self.object_conditions is not None:
-            body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions
+            body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions  # type: ignore[index]
 
         if self.transfer_options is not None:
-            body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options
+            body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options  # type: ignore[index]
 
         return body
 
@@ -1023,7 +1023,7 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
         self.timeout = timeout
         self.google_impersonation_chain = google_impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDataTransferServiceHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -1039,7 +1039,7 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
         if self.wait:
             hook.wait_for_transfer_job(job, timeout=self.timeout)
 
-    def _create_body(self):
+    def _create_body(self) -> dict:
         body = {
             DESCRIPTION: self.description,
             STATUS: GcpTransferJobsStatus.ENABLED,
@@ -1056,9 +1056,9 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
             body[SCHEDULE] = self.schedule
 
         if self.object_conditions is not None:
-            body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions
+            body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions  # type: ignore[index]
 
         if self.transfer_options is not None:
-            body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options
+            body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options  # type: ignore[index]
 
         return body
diff --git a/airflow/providers/google/cloud/operators/compute.py b/airflow/providers/google/cloud/operators/compute.py
index 5d66642..d5bc4dc 100644
--- a/airflow/providers/google/cloud/operators/compute.py
+++ b/airflow/providers/google/cloud/operators/compute.py
@@ -59,7 +59,7 @@ class ComputeEngineBaseOperator(BaseOperator):
         self._validate_inputs()
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is missing")
         if not self.zone:
@@ -139,7 +139,7 @@ class ComputeEngineStartInstanceOperator(ComputeEngineBaseOperator):
             **kwargs,
         )
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = ComputeEngineHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -216,7 +216,7 @@ class ComputeEngineStopInstanceOperator(ComputeEngineBaseOperator):
             **kwargs,
         )
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = ComputeEngineHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -312,11 +312,11 @@ class ComputeEngineSetMachineTypeOperator(ComputeEngineBaseOperator):
             **kwargs,
         )
 
-    def _validate_all_body_fields(self):
+    def _validate_all_body_fields(self) -> None:
         if self._field_validator:
             self._field_validator.validate(self.body)
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = ComputeEngineHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -482,11 +482,11 @@ class ComputeEngineCopyInstanceTemplateOperator(ComputeEngineBaseOperator):
             **kwargs,
         )
 
-    def _validate_all_body_fields(self):
+    def _validate_all_body_fields(self) -> None:
         if self._field_validator:
             self._field_validator.validate(self.body_patch)
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = ComputeEngineHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -623,12 +623,12 @@ class ComputeEngineInstanceGroupUpdateManagerTemplateOperator(ComputeEngineBaseO
             **kwargs,
         )
 
-    def _possibly_replace_template(self, dictionary: Dict) -> None:
+    def _possibly_replace_template(self, dictionary: dict) -> None:
         if dictionary.get('instanceTemplate') == self.source_template:
             dictionary['instanceTemplate'] = self.destination_template
             self._change_performed = True
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[bool]:
         hook = ComputeEngineHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
diff --git a/airflow/providers/google/cloud/operators/datacatalog.py b/airflow/providers/google/cloud/operators/datacatalog.py
index 9264742..00b2765 100644
--- a/airflow/providers/google/cloud/operators/datacatalog.py
+++ b/airflow/providers/google/cloud/operators/datacatalog.py
@@ -124,7 +124,7 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -240,7 +240,7 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -362,7 +362,7 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -489,7 +489,7 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -609,7 +609,7 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -718,7 +718,7 @@ class CloudDataCatalogDeleteEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -810,7 +810,7 @@ class CloudDataCatalogDeleteEntryGroupOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -909,7 +909,7 @@ class CloudDataCatalogDeleteTagOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1007,7 +1007,7 @@ class CloudDataCatalogDeleteTagTemplateOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1107,7 +1107,7 @@ class CloudDataCatalogDeleteTagTemplateFieldOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1203,7 +1203,7 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1299,7 +1299,7 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1387,7 +1387,7 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1487,7 +1487,7 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> list:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1578,7 +1578,7 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1676,7 +1676,7 @@ class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1796,7 +1796,7 @@ class CloudDataCatalogSearchCatalogOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> list:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -1906,7 +1906,7 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -2020,7 +2020,7 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -2132,7 +2132,7 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
@@ -2254,7 +2254,7 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py
index 6bdd907..f644008 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -227,14 +227,14 @@ class DataflowCreateJavaJobOperator(BaseOperator):
         dataflow_options.update(self.options)
         is_running = False
         if self.check_if_running != CheckJobRunning.IgnoreJob:
-            is_running = self.hook.is_job_dataflow_running(
+            is_running = self.hook.is_job_dataflow_running(  # type: ignore[attr-defined]
                 name=self.job_name,
                 variables=dataflow_options,
                 project_id=self.project_id,
                 location=self.location,
             )
             while is_running and self.check_if_running == CheckJobRunning.WaitForRun:
-                is_running = self.hook.is_job_dataflow_running(
+                is_running = self.hook.is_job_dataflow_running(  # type: ignore[attr-defined]
                     name=self.job_name,
                     variables=dataflow_options,
                     project_id=self.project_id,
@@ -253,7 +253,7 @@ class DataflowCreateJavaJobOperator(BaseOperator):
                 def set_current_job_id(job_id):
                     self.job_id = job_id
 
-                self.hook.start_java_dataflow(
+                self.hook.start_java_dataflow(  # type: ignore[attr-defined]
                     job_name=self.job_name,
                     variables=dataflow_options,
                     jar=self.jar,
@@ -419,7 +419,7 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         self.environment = environment
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         self.hook = DataflowHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -585,7 +585,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
             def set_current_job_id(job_id):
                 self.job_id = job_id
 
-            self.hook.start_python_dataflow(
+            self.hook.start_python_dataflow(  # type: ignore[attr-defined]
                 job_name=self.job_name,
                 variables=formatted_options,
                 dataflow=self.py_file,
diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py
index 058e81f..c02fb94 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -90,7 +90,7 @@ class CloudDataFusionRestartInstanceOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -167,7 +167,7 @@ class CloudDataFusionDeleteInstanceOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -250,7 +250,7 @@ class CloudDataFusionCreateInstanceOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -360,7 +360,7 @@ class CloudDataFusionUpdateInstanceOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -439,7 +439,7 @@ class CloudDataFusionGetInstanceOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -529,7 +529,7 @@ class CloudDataFusionCreatePipelineOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -626,7 +626,7 @@ class CloudDataFusionDeletePipelineOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -724,7 +724,7 @@ class CloudDataFusionListPipelinesOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -832,7 +832,7 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -934,7 +934,7 @@ class CloudDataFusionStopPipelineOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataFusionHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
diff --git a/airflow/providers/google/cloud/operators/dataprep.py b/airflow/providers/google/cloud/operators/dataprep.py
index 8d08c5a..e53ba84 100644
--- a/airflow/providers/google/cloud/operators/dataprep.py
+++ b/airflow/providers/google/cloud/operators/dataprep.py
@@ -19,8 +19,6 @@
 This module contains a Google Dataprep operator.
 """
 
-from typing import Dict
-
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.dataprep import GoogleDataprepHook
 from airflow.utils.decorators import apply_defaults
@@ -47,7 +45,7 @@ class DataprepGetJobsForJobGroupOperator(BaseOperator):
         self.dataprep_conn_id = (dataprep_conn_id,)
         self.job_id = job_id
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
         hook = GoogleDataprepHook(
             dataprep_conn_id="dataprep_default",
@@ -92,7 +90,7 @@ class DataprepGetJobGroupOperator(BaseOperator):
         self.embed = embed
         self.include_deleted = include_deleted
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> dict:
         self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
         hook = GoogleDataprepHook(dataprep_conn_id=self.dataprep_conn_id)
         response = hook.get_job_group(
@@ -121,7 +119,7 @@ class DataprepRunJobGroupOperator(BaseOperator):
         self.body_request = body_request
         self.dataprep_conn_id = dataprep_conn_id
 
-    def execute(self, context: None):
+    def execute(self, context: None) -> dict:
         self.log.info("Creating a job...")
         hook = GoogleDataprepHook(dataprep_conn_id=self.dataprep_conn_id)
         response = hook.run_job_group(body_request=self.body_request)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 03efe00..c6fcdc0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -229,7 +229,7 @@ class ClusterGenerator:
         if self.single_node and self.num_preemptible_workers > 0:
             raise ValueError("Single node cannot have preemptible workers.")
 
-    def _get_init_action_timeout(self):
+    def _get_init_action_timeout(self) -> dict:
         match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
         if match:
             val = float(match.group(1))
@@ -553,7 +553,7 @@ class DataprocCreateClusterOperator(BaseOperator):
         self.log.info("Deleting the cluster")
         hook.delete_cluster(region=self.region, cluster_name=self.cluster_name, project_id=self.project_id)
 
-    def _get_cluster(self, hook: DataprocHook):
+    def _get_cluster(self, hook: DataprocHook) -> Cluster:
         return hook.get_cluster(
             project_id=self.project_id,
             region=self.region,
@@ -601,7 +601,7 @@ class DataprocCreateClusterOperator(BaseOperator):
             cluster = self._get_cluster(hook)
         return cluster
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
         try:
@@ -711,7 +711,7 @@ class DataprocScaleClusterOperator(BaseOperator):
             stacklevel=1,
         )
 
-    def _build_scale_cluster_data(self):
+    def _build_scale_cluster_data(self) -> dict:
         scale_data = {
             'config': {
                 'worker_config': {'num_instances': self.num_workers},
@@ -749,7 +749,7 @@ class DataprocScaleClusterOperator(BaseOperator):
 
         return {'seconds': timeout}
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         """
         Scale, up or down, a cluster on Google Cloud Dataproc.
         """
@@ -839,7 +839,7 @@ class DataprocDeleteClusterOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
         self.log.info("Deleting cluster: %s", self.cluster_name)
         operation = hook.delete_cluster(
@@ -974,7 +974,7 @@ class DataprocJobBaseOperator(BaseOperator):
         self.job_template.add_jar_file_uris(self.dataproc_jars)
         self.job_template.add_labels(self.labels)
 
-    def _generate_job_template(self):
+    def _generate_job_template(self) -> str:
         if self.job_template:
             job = self.job_template.build()
             return job['job']
@@ -999,7 +999,7 @@ class DataprocJobBaseOperator(BaseOperator):
         else:
             raise AirflowException("Create a job template before")
 
-    def on_kill(self):
+    def on_kill(self) -> None:
         """
         Callback called when the operator is killed.
         Cancel any running job.
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 91e50f4..30f4548 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -113,7 +113,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         if kwargs.get('xcom_push') is not None:
             raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         self.log.info('Exporting data to Cloud Storage bucket %s', self.bucket)
 
         if self.overwrite_existing and self.namespace:
@@ -305,7 +305,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> list:
         hook = DatastoreHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -373,7 +373,7 @@ class CloudDatastoreBeginTransactionOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> str:
         hook = DatastoreHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -441,7 +441,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = DatastoreHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -509,7 +509,7 @@ class CloudDatastoreRollbackOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = DatastoreHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -576,7 +576,7 @@ class CloudDatastoreRunQueryOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = DatastoreHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -693,7 +693,7 @@ class CloudDatastoreDeleteOperationOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = DatastoreHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/dlp.py b/airflow/providers/google/cloud/operators/dlp.py
index b2623a5..5d28d66 100644
--- a/airflow/providers/google/cloud/operators/dlp.py
+++ b/airflow/providers/google/cloud/operators/dlp.py
@@ -112,7 +112,7 @@ class CloudDLPCancelDLPJobOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDLPHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -749,7 +749,7 @@ class CloudDLPDeidentifyContentOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = CloudDLPHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -839,7 +839,7 @@ class CloudDLPDeleteDeidentifyTemplateOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDLPHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -923,7 +923,7 @@ class CloudDLPDeleteDLPJobOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDLPHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -1011,7 +1011,7 @@ class CloudDLPDeleteInspectTemplateOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudDLPHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/functions.py b/airflow/providers/google/cloud/operators/functions.py
index a2a2043..7066e1b 100644
--- a/airflow/providers/google/cloud/operators/functions.py
+++ b/airflow/providers/google/cloud/operators/functions.py
@@ -180,24 +180,24 @@ class CloudFunctionDeployFunctionOperator(BaseOperator):
         self._validate_inputs()
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.location:
             raise AirflowException("The required parameter 'location' is missing")
         if not self.body:
             raise AirflowException("The required parameter 'body' is missing")
         self.zip_path_preprocessor.preprocess_body()
 
-    def _validate_all_body_fields(self):
+    def _validate_all_body_fields(self) -> None:
         if self._field_validator:
             self._field_validator.validate(self.body)
 
-    def _create_new_function(self, hook):
+    def _create_new_function(self, hook) -> None:
         hook.create_new_function(project_id=self.project_id, location=self.location, body=self.body)
 
-    def _update_function(self, hook):
+    def _update_function(self, hook) -> None:
         hook.update_function(self.body['name'], self.body, self.body.keys())
 
-    def _check_if_function_exists(self, hook):
+    def _check_if_function_exists(self, hook) -> bool:
         name = self.body.get('name')
         if not name:
             raise GcpFieldValidationException(
@@ -217,7 +217,7 @@ class CloudFunctionDeployFunctionOperator(BaseOperator):
             project_id=self.project_id, location=self.location, zip_path=self.zip_path
         )
 
-    def _set_airflow_version_label(self):
+    def _set_airflow_version_label(self) -> None:
         if 'labels' not in self.body.keys():
             self.body['labels'] = {}
         self.body['labels'].update({'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')})
@@ -274,10 +274,10 @@ class ZipPathPreprocessor:
         self.zip_path = zip_path
 
     @staticmethod
-    def _is_present_and_empty(dictionary, field):
+    def _is_present_and_empty(dictionary, field) -> bool:
         return field in dictionary and not dictionary[field]
 
-    def _verify_upload_url_and_no_zip_path(self):
+    def _verify_upload_url_and_no_zip_path(self) -> None:
         if self._is_present_and_empty(self.body, GCF_SOURCE_UPLOAD_URL):
             if not self.zip_path:
                 raise AirflowException(
@@ -286,7 +286,7 @@ class ZipPathPreprocessor:
                     "when '{url}' is present and empty.".format(url=GCF_SOURCE_UPLOAD_URL, path=GCF_ZIP_PATH)
                 )
 
-    def _verify_upload_url_and_zip_path(self):
+    def _verify_upload_url_and_zip_path(self) -> None:
         if GCF_SOURCE_UPLOAD_URL in self.body and self.zip_path:
             if not self.body[GCF_SOURCE_UPLOAD_URL]:
                 self.upload_function = True
@@ -296,7 +296,7 @@ class ZipPathPreprocessor:
                     "allowed. Found both.".format(GCF_SOURCE_UPLOAD_URL, GCF_ZIP_PATH)
                 )
 
-    def _verify_archive_url_and_zip_path(self):
+    def _verify_archive_url_and_zip_path(self) -> None:
         if GCF_SOURCE_ARCHIVE_URL in self.body and self.zip_path:
             raise AirflowException(
                 "Only one of '{}' in body or '{}' argument "
@@ -313,7 +313,7 @@ class ZipPathPreprocessor:
             raise AirflowException('validate() method has to be invoked before ' 'should_upload_function')
         return self.upload_function
 
-    def preprocess_body(self):
+    def preprocess_body(self) -> None:
         """
         Modifies sourceUploadUrl body field in special way when zip_path
         is not empty.
@@ -381,7 +381,7 @@ class CloudFunctionDeleteFunctionOperator(BaseOperator):
         self._validate_inputs()
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.name:
             raise AttributeError('Empty parameter: name')
         else:
diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py
index fea3eda..a4c38b0 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -151,7 +151,7 @@ class GCSCreateBucketOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = GCSHook(
             google_cloud_storage_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -258,7 +258,7 @@ class GCSListObjectsOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> list:
 
         hook = GCSHook(
             google_cloud_storage_conn_id=self.gcp_conn_id,
@@ -445,7 +445,7 @@ class GCSBucketCreateAclEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = GCSHook(
             google_cloud_storage_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -544,7 +544,7 @@ class GCSObjectCreateAclEntryOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = GCSHook(
             google_cloud_storage_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -624,7 +624,7 @@ class GCSFileTransformOperator(BaseOperator):
         self.output_encoding = sys.getdefaultencoding()
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Dict):
+    def execute(self, context: dict) -> None:
         hook = GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
 
         with NamedTemporaryFile() as source_file, NamedTemporaryFile() as destination_file:
@@ -707,7 +707,7 @@ class GCSDeleteBucketOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
         hook.delete_bucket(bucket_name=self.bucket_name, force=self.force)
 
@@ -805,7 +805,7 @@ class GCSSynchronizeBucketsOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = GCSHook(
             google_cloud_storage_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index be24fda..5212fba 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -111,12 +111,12 @@ class GKEDeleteClusterOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         self._check_input()
 
-    def _check_input(self):
+    def _check_input(self) -> None:
         if not all([self.project_id, self.name, self.location]):
             self.log.error('One of (project_id, name, location) is missing or incorrect')
             raise AirflowException('Operator has incorrect or missing input.')
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[str]:
         hook = GKEHook(
             gcp_conn_id=self.gcp_conn_id,
             location=self.location,
@@ -214,7 +214,7 @@ class GKECreateClusterOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         self._check_input()
 
-    def _check_input(self):
+    def _check_input(self) -> None:
         if not all([self.project_id, self.location, self.body]) or not (
             (isinstance(self.body, dict) and "name" in self.body and "initial_node_count" in self.body)
             or (getattr(self.body, "name", None) and getattr(self.body, "initial_node_count", None))
@@ -225,7 +225,7 @@ class GKECreateClusterOperator(BaseOperator):
             )
             raise AirflowException("Operator has incorrect or missing input.")
 
-    def execute(self, context):
+    def execute(self, context) -> str:
         hook = GKEHook(
             gcp_conn_id=self.gcp_conn_id,
             location=self.location,
@@ -299,7 +299,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
                 "called `google_cloud_default`.",
             )
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[str]:
         hook = GoogleBaseHook(gcp_conn_id=self.gcp_conn_id)
         self.project_id = self.project_id or hook.project_id
 
diff --git a/airflow/providers/google/cloud/operators/life_sciences.py b/airflow/providers/google/cloud/operators/life_sciences.py
index c4d9cd7..0c82927 100644
--- a/airflow/providers/google/cloud/operators/life_sciences.py
+++ b/airflow/providers/google/cloud/operators/life_sciences.py
@@ -83,13 +83,13 @@ class LifeSciencesRunPipelineOperator(BaseOperator):
         self._validate_inputs()
         self.impersonation_chain = impersonation_chain
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if not self.body:
             raise AirflowException("The required parameter 'body' is missing")
         if not self.location:
             raise AirflowException("The required parameter 'location' is missing")
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = LifeSciencesHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py
index 544b615..e539e6d 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -172,7 +172,7 @@ class PubSubCreateTopicOperator(BaseOperator):
         self.metadata = metadata
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -421,7 +421,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
         self.metadata = metadata
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> str:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -566,7 +566,7 @@ class PubSubDeleteTopicOperator(BaseOperator):
         self.metadata = metadata
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -699,7 +699,7 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
         self.metadata = metadata
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -820,7 +820,7 @@ class PubSubPublishMessageOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -919,7 +919,7 @@ class PubSubPullOperator(BaseOperator):
         self.messages_callback = messages_callback
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> list:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
@@ -950,7 +950,7 @@ class PubSubPullOperator(BaseOperator):
         self,
         pulled_messages: List[ReceivedMessage],
         context: Dict[str, Any],  # pylint: disable=unused-argument
-    ):
+    ) -> list:
         """
         This method can be overridden by subclasses or by `messages_callback` constructor argument.
         This default implementation converts `ReceivedMessage` objects into JSON-serializable dicts.
diff --git a/airflow/providers/google/cloud/operators/spanner.py b/airflow/providers/google/cloud/operators/spanner.py
index 9f14cca..12019bd 100644
--- a/airflow/providers/google/cloud/operators/spanner.py
+++ b/airflow/providers/google/cloud/operators/spanner.py
@@ -98,13 +98,13 @@ class SpannerDeployInstanceOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance_id:
             raise AirflowException("The required parameter 'instance_id' " "is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = SpannerHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -177,13 +177,13 @@ class SpannerDeleteInstanceOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance_id:
             raise AirflowException("The required parameter 'instance_id' " "is empty or None")
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[bool]:
         hook = SpannerHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -263,7 +263,7 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance_id:
@@ -297,7 +297,7 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
         )
 
     @staticmethod
-    def sanitize_queries(queries):
+    def sanitize_queries(queries: List[str]) -> None:
         """
         Drops empty query in queries.
 
@@ -373,7 +373,7 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance_id:
@@ -381,7 +381,7 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
         if not self.database_id:
             raise AirflowException("The required parameter 'database_id' is empty" " or None")
 
-    def execute(self, context):
+    def execute(self, context) -> Optional[bool]:
         hook = SpannerHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -480,7 +480,7 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance_id:
@@ -490,7 +490,7 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
         if not self.ddl_statements:
             raise AirflowException("The required parameter 'ddl_statements' is empty" " or None")
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = SpannerHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
@@ -570,7 +570,7 @@ class SpannerDeleteDatabaseInstanceOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.project_id == '':
             raise AirflowException("The required parameter 'project_id' is empty")
         if not self.instance_id:
@@ -578,7 +578,7 @@ class SpannerDeleteDatabaseInstanceOperator(BaseOperator):
         if not self.database_id:
             raise AirflowException("The required parameter 'database_id' is empty" " or None")
 
-    def execute(self, context):
+    def execute(self, context) -> bool:
         hook = SpannerHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/speech_to_text.py b/airflow/providers/google/cloud/operators/speech_to_text.py
index 148cd28..b8c7933 100644
--- a/airflow/providers/google/cloud/operators/speech_to_text.py
+++ b/airflow/providers/google/cloud/operators/speech_to_text.py
@@ -102,7 +102,7 @@ class CloudSpeechToTextRecognizeSpeechOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         if self.audio == "":
             raise AirflowException("The required parameter 'audio' is empty")
         if self.config == "":
diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py
index 517fe5a..dc86466 100644
--- a/airflow/providers/google/cloud/operators/stackdriver.py
+++ b/airflow/providers/google/cloud/operators/stackdriver.py
@@ -108,7 +108,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.format_ = format_
         self.filter_ = filter_
@@ -212,7 +212,7 @@ class StackdriverEnableAlertPoliciesOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.gcp_conn_id = gcp_conn_id
         self.project_id = project_id
@@ -303,7 +303,7 @@ class StackdriverDisableAlertPoliciesOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.gcp_conn_id = gcp_conn_id
         self.project_id = project_id
@@ -396,7 +396,7 @@ class StackdriverUpsertAlertOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.alerts = alerts
         self.retry = retry
@@ -485,7 +485,7 @@ class StackdriverDeleteAlertOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.name = name
         self.retry = retry
@@ -597,7 +597,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.format_ = format_
         self.filter_ = filter_
@@ -701,7 +701,7 @@ class StackdriverEnableNotificationChannelsOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.filter_ = filter_
         self.retry = retry
@@ -794,7 +794,7 @@ class StackdriverDisableNotificationChannelsOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.filter_ = filter_
         self.retry = retry
@@ -889,7 +889,7 @@ class StackdriverUpsertNotificationChannelOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.channels = channels
         self.retry = retry
@@ -980,7 +980,7 @@ class StackdriverDeleteNotificationChannelOperator(BaseOperator):
         delegate_to: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
-    ):
+    ) -> None:
         super().__init__(**kwargs)
         self.name = name
         self.retry = retry
diff --git a/airflow/providers/google/cloud/operators/text_to_speech.py b/airflow/providers/google/cloud/operators/text_to_speech.py
index 21e3a7a..16dfe1b 100644
--- a/airflow/providers/google/cloud/operators/text_to_speech.py
+++ b/airflow/providers/google/cloud/operators/text_to_speech.py
@@ -119,7 +119,7 @@ class CloudTextToSpeechSynthesizeOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         super().__init__(**kwargs)
 
-    def _validate_inputs(self):
+    def _validate_inputs(self) -> None:
         for parameter in [
             "input_data",
             "voice",
@@ -130,7 +130,7 @@ class CloudTextToSpeechSynthesizeOperator(BaseOperator):
             if getattr(self, parameter) == "":
                 raise AirflowException("The required parameter '{}' is empty".format(parameter))
 
-    def execute(self, context):
+    def execute(self, context) -> None:
         hook = CloudTextToSpeechHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/translate.py b/airflow/providers/google/cloud/operators/translate.py
index c931c6f..bfc33be 100644
--- a/airflow/providers/google/cloud/operators/translate.py
+++ b/airflow/providers/google/cloud/operators/translate.py
@@ -118,7 +118,7 @@ class CloudTranslateTextOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         hook = CloudTranslateHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/translate_speech.py b/airflow/providers/google/cloud/operators/translate_speech.py
index 8f8380b..ae75681 100644
--- a/airflow/providers/google/cloud/operators/translate_speech.py
+++ b/airflow/providers/google/cloud/operators/translate_speech.py
@@ -146,7 +146,7 @@ class CloudTranslateSpeechOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context):
+    def execute(self, context) -> dict:
         speech_to_text_hook = CloudSpeechToTextHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,