You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/12 10:24:43 UTC
[airflow] branch master updated: Google cloud operator strict type
check (#11450)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 06141d6 Google cloud operator strict type check (#11450)
06141d6 is described below
commit 06141d6d01398115e5e54c5766a46ae5514ba2f7
Author: Satyasheel <ml...@users.noreply.github.com>
AuthorDate: Mon Oct 12 11:23:35 2020 +0100
Google cloud operator strict type check (#11450)
import optimisation
---
.../providers/google/cloud/operators/bigquery.py | 31 +++++----
.../google/cloud/operators/bigquery_dts.py | 8 +--
.../providers/google/cloud/operators/bigtable.py | 14 ++--
.../google/cloud/operators/cloud_build.py | 14 ++--
.../google/cloud/operators/cloud_memorystore.py | 22 +++----
.../providers/google/cloud/operators/cloud_sql.py | 48 +++++++-------
.../operators/cloud_storage_transfer_service.py | 74 +++++++++++-----------
.../providers/google/cloud/operators/compute.py | 18 +++---
.../google/cloud/operators/datacatalog.py | 42 ++++++------
.../providers/google/cloud/operators/dataflow.py | 10 +--
.../providers/google/cloud/operators/datafusion.py | 20 +++---
.../providers/google/cloud/operators/dataprep.py | 8 +--
.../providers/google/cloud/operators/dataproc.py | 16 ++---
.../providers/google/cloud/operators/datastore.py | 14 ++--
airflow/providers/google/cloud/operators/dlp.py | 10 +--
.../providers/google/cloud/operators/functions.py | 24 +++----
airflow/providers/google/cloud/operators/gcs.py | 14 ++--
.../google/cloud/operators/kubernetes_engine.py | 10 +--
.../google/cloud/operators/life_sciences.py | 4 +-
airflow/providers/google/cloud/operators/pubsub.py | 14 ++--
.../providers/google/cloud/operators/spanner.py | 24 +++----
.../google/cloud/operators/speech_to_text.py | 2 +-
.../google/cloud/operators/stackdriver.py | 20 +++---
.../google/cloud/operators/text_to_speech.py | 4 +-
.../providers/google/cloud/operators/translate.py | 2 +-
.../google/cloud/operators/translate_speech.py | 2 +-
26 files changed, 237 insertions(+), 232 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 93e7db0..0686b33 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -26,6 +26,7 @@ import json
import re
import uuid
import warnings
+from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, SupportsAbs, Union
import attr
@@ -81,7 +82,7 @@ class BigQueryConsoleIndexableLink(BaseOperatorLink):
def name(self) -> str:
return f'BigQuery Console #{self.index + 1}'
- def get_link(self, operator, dttm):
+ def get_link(self, operator: BaseOperator, dttm: datetime):
ti = TaskInstance(task=operator, execution_date=dttm)
job_ids = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
if not job_ids:
@@ -466,7 +467,7 @@ class BigQueryGetDataOperator(BaseOperator):
self.location = location
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> list:
self.log.info(
'Fetching Data from %s.%s max results: %s', self.dataset_id, self.table_id, self.max_results
)
@@ -741,7 +742,7 @@ class BigQueryExecuteQueryOperator(BaseOperator):
)
context['task_instance'].xcom_push(key='job_id', value=job_id)
- def on_kill(self):
+ def on_kill(self) -> None:
super().on_kill()
if self.hook is not None:
self.log.info('Cancelling running query')
@@ -931,7 +932,7 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
self.table_resource = table_resource
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
bq_hook = BigQueryHook(
gcp_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
@@ -946,7 +947,9 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- schema_fields = json.loads(gcs_hook.download(gcs_bucket, gcs_object).decode("utf-8"))
+ schema_fields = json.loads(
+ gcs_hook.download(gcs_bucket, gcs_object).decode("utf-8") # type: ignore[attr-defined]
+ ) # type: ignore[attr-defined]
else:
schema_fields = self.schema_fields
@@ -1172,7 +1175,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
self.location = location
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
bq_hook = BigQueryHook(
gcp_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
@@ -1187,7 +1190,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
impersonation_chain=self.impersonation_chain,
)
schema_object = gcs_hook.download(self.bucket, self.schema_object)
- schema_fields = json.loads(schema_object.decode("utf-8"))
+ schema_fields = json.loads(schema_object.decode("utf-8")) # type: ignore[attr-defined]
else:
schema_fields = self.schema_fields
@@ -1309,7 +1312,7 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
self.log.info('Dataset id: %s Project id: %s', self.dataset_id, self.project_id)
bq_hook = BigQueryHook(
@@ -1413,7 +1416,7 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
bq_hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -1828,7 +1831,7 @@ class BigQueryDeleteTableOperator(BaseOperator):
self.location = location
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
self.log.info('Deleting: %s', self.deletion_dataset_table)
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
@@ -1919,7 +1922,7 @@ class BigQueryUpsertTableOperator(BaseOperator):
self.location = location
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
self.log.info('Upserting Dataset: %s with table_resource: %s', self.dataset_id, self.table_resource)
hook = BigQueryHook(
bigquery_conn_id=self.gcp_conn_id,
@@ -2107,6 +2110,8 @@ class BigQueryInsertJobOperator(BaseOperator):
self.job_id = job.job_id
return job.job_id
- def on_kill(self):
+ def on_kill(self) -> None:
if self.job_id and self.cancel_on_kill:
- self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, location=self.location)
+ self.hook.cancel_job( # type: ignore[union-attr]
+ job_id=self.job_id, project_id=self.project_id, location=self.location
+ )
diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py b/airflow/providers/google/cloud/operators/bigquery_dts.py
index 6bdba6e..1c401fc 100644
--- a/airflow/providers/google/cloud/operators/bigquery_dts.py
+++ b/airflow/providers/google/cloud/operators/bigquery_dts.py
@@ -88,7 +88,7 @@ class BigQueryCreateDataTransferOperator(BaseOperator):
gcp_conn_id="google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.transfer_config = transfer_config
self.authorization_code = authorization_code
@@ -172,7 +172,7 @@ class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
gcp_conn_id="google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.transfer_config_id = transfer_config_id
@@ -182,7 +182,7 @@ class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BiqQueryDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -265,7 +265,7 @@ class BigQueryDataTransferServiceStartTransferRunsOperator(BaseOperator):
gcp_conn_id="google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.transfer_config_id = transfer_config_id
diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py
index f47f6be..ab9cf2e 100644
--- a/airflow/providers/google/cloud/operators/bigtable.py
+++ b/airflow/providers/google/cloud/operators/bigtable.py
@@ -153,7 +153,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BigtableHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -259,7 +259,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BigtableHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -335,7 +335,7 @@ class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BigtableHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -423,7 +423,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _compare_column_families(self, hook, instance):
+ def _compare_column_families(self, hook, instance) -> bool:
table_column_families = hook.get_column_families_for_table(instance, self.table_id)
if set(table_column_families.keys()) != set(self.column_families.keys()):
self.log.error("Table '%s' has different set of Column Families", self.table_id)
@@ -444,7 +444,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
return False
return True
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BigtableHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -533,7 +533,7 @@ class BigtableDeleteTableOperator(BaseOperator, BigtableValidationMixin):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BigtableHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -619,7 +619,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = BigtableHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py b/airflow/providers/google/cloud/operators/cloud_build.py
index 5d40042..5126e35 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -46,10 +46,10 @@ class BuildProcessor:
:type body: dict
"""
- def __init__(self, body: Dict) -> None:
+ def __init__(self, body: dict) -> None:
self.body = deepcopy(body)
- def _verify_source(self):
+ def _verify_source(self) -> None:
is_storage = "storageSource" in self.body["source"]
is_repo = "repoSource" in self.body["source"]
@@ -61,11 +61,11 @@ class BuildProcessor:
"storageSource and repoSource."
)
- def _reformat_source(self):
+ def _reformat_source(self) -> None:
self._reformat_repo_source()
self._reformat_storage_source()
- def _reformat_repo_source(self):
+ def _reformat_repo_source(self) -> None:
if "repoSource" not in self.body["source"]:
return
@@ -76,7 +76,7 @@ class BuildProcessor:
self.body["source"]["repoSource"] = self._convert_repo_url_to_dict(source)
- def _reformat_storage_source(self):
+ def _reformat_storage_source(self) -> None:
if "storageSource" not in self.body["source"]:
return
@@ -87,7 +87,7 @@ class BuildProcessor:
self.body["source"]["storageSource"] = self._convert_storage_url_to_dict(source)
- def process_body(self):
+ def process_body(self) -> dict:
"""
Processes the body passed in the constructor
@@ -228,7 +228,7 @@ class CloudBuildCreateBuildOperator(BaseOperator):
if self.body_raw.endswith('.json'):
self.body = json.loads(file.read())
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.body:
raise AirflowException("The required parameter 'body' is missing")
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index b151ea1..0600914 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -117,7 +117,7 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -204,7 +204,7 @@ class CloudMemorystoreDeleteInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -298,7 +298,7 @@ class CloudMemorystoreExportInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -391,7 +391,7 @@ class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -476,7 +476,7 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -572,7 +572,7 @@ class CloudMemorystoreImportOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -661,7 +661,7 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -771,7 +771,7 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -863,7 +863,7 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -978,7 +978,7 @@ class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1085,7 +1085,7 @@ class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
diff --git a/airflow/providers/google/cloud/operators/cloud_sql.py b/airflow/providers/google/cloud/operators/cloud_sql.py
index b3cf24e..162264d 100644
--- a/airflow/providers/google/cloud/operators/cloud_sql.py
+++ b/airflow/providers/google/cloud/operators/cloud_sql.py
@@ -232,13 +232,13 @@ class CloudSQLBaseOperator(BaseOperator):
self._validate_inputs()
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance:
raise AirflowException("The required parameter 'instance' is empty or None")
- def _check_if_instance_exists(self, instance, hook: CloudSQLHook):
+ def _check_if_instance_exists(self, instance, hook: CloudSQLHook) -> Union[dict, bool]:
try:
return hook.get_instance(project_id=self.project_id, instance=instance)
except HttpError as e:
@@ -247,7 +247,7 @@ class CloudSQLBaseOperator(BaseOperator):
return False
raise e
- def _check_if_db_exists(self, db_name, hook: CloudSQLHook):
+ def _check_if_db_exists(self, db_name, hook: CloudSQLHook) -> Union[dict, bool]:
try:
return hook.get_database(project_id=self.project_id, instance=self.instance, database=db_name)
except HttpError as e:
@@ -335,18 +335,18 @@ class CloudSQLCreateInstanceOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.body:
raise AirflowException("The required parameter 'body' is empty")
- def _validate_body_fields(self):
+ def _validate_body_fields(self) -> None:
if self.validate_body:
GcpBodyFieldValidator(CLOUD_SQL_CREATE_VALIDATION, api_version=self.api_version).validate(
self.body
)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -435,7 +435,7 @@ class CloudSQLInstancePatchOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.body:
raise AirflowException("The required parameter 'body' is empty")
@@ -513,7 +513,7 @@ class CloudSQLDeleteInstanceOperator(CloudSQLBaseOperator):
**kwargs,
)
- def execute(self, context):
+ def execute(self, context) -> Optional[bool]:
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -594,18 +594,18 @@ class CloudSQLCreateInstanceDatabaseOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.body:
raise AirflowException("The required parameter 'body' is empty")
- def _validate_body_fields(self):
+ def _validate_body_fields(self) -> None:
if self.validate_body:
GcpBodyFieldValidator(
CLOUD_SQL_DATABASE_CREATE_VALIDATION, api_version=self.api_version
).validate(self.body)
- def execute(self, context):
+ def execute(self, context) -> Optional[bool]:
self._validate_body_fields()
database = self.body.get("name")
if not database:
@@ -705,20 +705,20 @@ class CloudSQLPatchInstanceDatabaseOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.body:
raise AirflowException("The required parameter 'body' is empty")
if not self.database:
raise AirflowException("The required parameter 'database' is empty")
- def _validate_body_fields(self):
+ def _validate_body_fields(self) -> None:
if self.validate_body:
GcpBodyFieldValidator(CLOUD_SQL_DATABASE_PATCH_VALIDATION, api_version=self.api_version).validate(
self.body
)
- def execute(self, context):
+ def execute(self, context) -> None:
self._validate_body_fields()
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
@@ -802,12 +802,12 @@ class CloudSQLDeleteInstanceDatabaseOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.database:
raise AirflowException("The required parameter 'database' is empty")
- def execute(self, context):
+ def execute(self, context) -> Optional[bool]:
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -897,18 +897,18 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.body:
raise AirflowException("The required parameter 'body' is empty")
- def _validate_body_fields(self):
+ def _validate_body_fields(self) -> None:
if self.validate_body:
GcpBodyFieldValidator(CLOUD_SQL_EXPORT_VALIDATION, api_version=self.api_version).validate(
self.body
)
- def execute(self, context):
+ def execute(self, context) -> None:
self._validate_body_fields()
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
@@ -1002,18 +1002,18 @@ class CloudSQLImportInstanceOperator(CloudSQLBaseOperator):
**kwargs,
)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.body:
raise AirflowException("The required parameter 'body' is empty")
- def _validate_body_fields(self):
+ def _validate_body_fields(self) -> None:
if self.validate_body:
GcpBodyFieldValidator(CLOUD_SQL_IMPORT_VALIDATION, api_version=self.api_version).validate(
self.body
)
- def execute(self, context):
+ def execute(self, context) -> None:
self._validate_body_fields()
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
@@ -1077,7 +1077,9 @@ class CloudSQLExecuteQueryOperator(BaseOperator):
self.parameters = parameters
self.gcp_connection = None
- def _execute_query(self, hook: CloudSQLDatabaseHook, database_hook: Union[PostgresHook, MySqlHook]):
+ def _execute_query(
+ self, hook: CloudSQLDatabaseHook, database_hook: Union[PostgresHook, MySqlHook]
+ ) -> None:
cloud_sql_proxy_runner = None
try:
if hook.use_proxy:
diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
index dc70431..536ebb1 100644
--- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
@@ -21,7 +21,7 @@ This module contains Google Cloud Transfer operators.
"""
from copy import deepcopy
from datetime import date, time
-from typing import Dict, Optional, Sequence, Union
+from typing import Dict, Optional, Sequence, Union, List
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -67,32 +67,32 @@ class TransferJobPreprocessor:
self.aws_conn_id = aws_conn_id
self.default_schedule = default_schedule
- def _inject_aws_credentials(self):
+ def _inject_aws_credentials(self) -> None:
if TRANSFER_SPEC in self.body and AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]:
aws_hook = AwsBaseHook(self.aws_conn_id, resource_type="s3")
aws_credentials = aws_hook.get_credentials()
- aws_access_key_id = aws_credentials.access_key
- aws_secret_access_key = aws_credentials.secret_key
+ aws_access_key_id = aws_credentials.access_key # type: ignore[attr-defined]
+ aws_secret_access_key = aws_credentials.secret_key # type: ignore[attr-defined]
self.body[TRANSFER_SPEC][AWS_S3_DATA_SOURCE][AWS_ACCESS_KEY] = {
ACCESS_KEY_ID: aws_access_key_id,
SECRET_ACCESS_KEY: aws_secret_access_key,
}
- def _reformat_date(self, field_key):
+ def _reformat_date(self, field_key: str) -> None:
schedule = self.body[SCHEDULE]
if field_key not in schedule:
return
if isinstance(schedule[field_key], date):
schedule[field_key] = self._convert_date_to_dict(schedule[field_key])
- def _reformat_time(self, field_key):
+ def _reformat_time(self, field_key: str) -> None:
schedule = self.body[SCHEDULE]
if field_key not in schedule:
return
if isinstance(schedule[field_key], time):
schedule[field_key] = self._convert_time_to_dict(schedule[field_key])
- def _reformat_schedule(self):
+ def _reformat_schedule(self) -> None:
if SCHEDULE not in self.body:
if self.default_schedule:
self.body[SCHEDULE] = {SCHEDULE_START_DATE: date.today(), SCHEDULE_END_DATE: date.today()}
@@ -102,7 +102,7 @@ class TransferJobPreprocessor:
self._reformat_date(SCHEDULE_END_DATE)
self._reformat_time(START_TIME_OF_DAY)
- def process_body(self):
+ def process_body(self) -> dict:
"""
Injects AWS credentials into body if needed and
reformats schedule information.
@@ -115,14 +115,14 @@ class TransferJobPreprocessor:
return self.body
@staticmethod
- def _convert_date_to_dict(field_date):
+ def _convert_date_to_dict(field_date: date) -> dict:
"""
Convert native python ``datetime.date`` object to a format supported by the API
"""
return {DAY: field_date.day, MONTH: field_date.month, YEAR: field_date.year}
@staticmethod
- def _convert_time_to_dict(time_object):
+ def _convert_time_to_dict(time_object: time) -> dict:
"""
Convert native python ``datetime.time`` object to a format supported by the API
"""
@@ -140,7 +140,7 @@ class TransferJobValidator:
self.body = body
- def _verify_data_source(self):
+ def _verify_data_source(self) -> None:
is_gcs = GCS_DATA_SOURCE in self.body[TRANSFER_SPEC]
is_aws_s3 = AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]
is_http = HTTP_DATA_SOURCE in self.body[TRANSFER_SPEC]
@@ -152,7 +152,7 @@ class TransferJobValidator:
"gcsDataSource, awsS3DataSource and httpDataSource."
)
- def _restrict_aws_credentials(self):
+ def _restrict_aws_credentials(self) -> None:
aws_transfer = AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]
if aws_transfer and AWS_ACCESS_KEY in self.body[TRANSFER_SPEC][AWS_S3_DATA_SOURCE]:
raise AirflowException(
@@ -160,7 +160,7 @@ class TransferJobValidator:
"please use Airflow connections to store credentials."
)
- def validate_body(self):
+ def validate_body(self) -> None:
"""
Validates the body. Checks if body specifies `transferSpec`
if yes, then check if AWS credentials are passed correctly and
@@ -247,10 +247,10 @@ class CloudDataTransferServiceCreateJobOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
TransferJobValidator(body=self.body).validate_body()
- def execute(self, context):
+ def execute(self, context) -> dict:
TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
@@ -329,12 +329,12 @@ class CloudDataTransferServiceUpdateJobOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
TransferJobValidator(body=self.body).validate_body()
if not self.job_name:
raise AirflowException("The required parameter 'job_name' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> dict:
TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
@@ -405,11 +405,11 @@ class CloudDataTransferServiceDeleteJobOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.job_name:
raise AirflowException("The required parameter 'job_name' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> None:
self._validate_inputs()
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
@@ -471,11 +471,11 @@ class CloudDataTransferServiceGetOperationOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.operation_name:
raise AirflowException("The required parameter 'operation_name' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
@@ -545,11 +545,11 @@ class CloudDataTransferServiceListOperationsOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.filter:
raise AirflowException("The required parameter 'filter' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> List[dict]:
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
@@ -611,11 +611,11 @@ class CloudDataTransferServicePauseOperationOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.operation_name:
raise AirflowException("The required parameter 'operation_name' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
@@ -675,11 +675,11 @@ class CloudDataTransferServiceResumeOperationOperator(BaseOperator):
self._validate_inputs()
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.operation_name:
raise AirflowException("The required parameter 'operation_name' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
@@ -740,11 +740,11 @@ class CloudDataTransferServiceCancelOperationOperator(BaseOperator):
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.operation_name:
raise AirflowException("The required parameter 'operation_name' is empty or None")
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
@@ -868,7 +868,7 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
self.timeout = timeout
self.google_impersonation_chain = google_impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -883,7 +883,7 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
if self.wait:
hook.wait_for_transfer_job(job, timeout=self.timeout)
- def _create_body(self):
+ def _create_body(self) -> dict:
body = {
DESCRIPTION: self.description,
STATUS: GcpTransferJobsStatus.ENABLED,
@@ -900,10 +900,10 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
body[SCHEDULE] = self.schedule
if self.object_conditions is not None:
- body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions
+ body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions # type: ignore[index]
if self.transfer_options is not None:
- body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options
+ body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options # type: ignore[index]
return body
@@ -1023,7 +1023,7 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
self.timeout = timeout
self.google_impersonation_chain = google_impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -1039,7 +1039,7 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
if self.wait:
hook.wait_for_transfer_job(job, timeout=self.timeout)
- def _create_body(self):
+ def _create_body(self) -> dict:
body = {
DESCRIPTION: self.description,
STATUS: GcpTransferJobsStatus.ENABLED,
@@ -1056,9 +1056,9 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
body[SCHEDULE] = self.schedule
if self.object_conditions is not None:
- body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions
+ body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions # type: ignore[index]
if self.transfer_options is not None:
- body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options
+ body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options # type: ignore[index]
return body
diff --git a/airflow/providers/google/cloud/operators/compute.py b/airflow/providers/google/cloud/operators/compute.py
index 5d66642..d5bc4dc 100644
--- a/airflow/providers/google/cloud/operators/compute.py
+++ b/airflow/providers/google/cloud/operators/compute.py
@@ -59,7 +59,7 @@ class ComputeEngineBaseOperator(BaseOperator):
self._validate_inputs()
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is missing")
if not self.zone:
@@ -139,7 +139,7 @@ class ComputeEngineStartInstanceOperator(ComputeEngineBaseOperator):
**kwargs,
)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = ComputeEngineHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -216,7 +216,7 @@ class ComputeEngineStopInstanceOperator(ComputeEngineBaseOperator):
**kwargs,
)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = ComputeEngineHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -312,11 +312,11 @@ class ComputeEngineSetMachineTypeOperator(ComputeEngineBaseOperator):
**kwargs,
)
- def _validate_all_body_fields(self):
+ def _validate_all_body_fields(self) -> None:
if self._field_validator:
self._field_validator.validate(self.body)
- def execute(self, context):
+ def execute(self, context) -> None:
hook = ComputeEngineHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -482,11 +482,11 @@ class ComputeEngineCopyInstanceTemplateOperator(ComputeEngineBaseOperator):
**kwargs,
)
- def _validate_all_body_fields(self):
+ def _validate_all_body_fields(self) -> None:
if self._field_validator:
self._field_validator.validate(self.body_patch)
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = ComputeEngineHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -623,12 +623,12 @@ class ComputeEngineInstanceGroupUpdateManagerTemplateOperator(ComputeEngineBaseO
**kwargs,
)
- def _possibly_replace_template(self, dictionary: Dict) -> None:
+ def _possibly_replace_template(self, dictionary: dict) -> None:
if dictionary.get('instanceTemplate') == self.source_template:
dictionary['instanceTemplate'] = self.destination_template
self._change_performed = True
- def execute(self, context):
+ def execute(self, context) -> Optional[bool]:
hook = ComputeEngineHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
diff --git a/airflow/providers/google/cloud/operators/datacatalog.py b/airflow/providers/google/cloud/operators/datacatalog.py
index 9264742..00b2765 100644
--- a/airflow/providers/google/cloud/operators/datacatalog.py
+++ b/airflow/providers/google/cloud/operators/datacatalog.py
@@ -124,7 +124,7 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -240,7 +240,7 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -362,7 +362,7 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -489,7 +489,7 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -609,7 +609,7 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -718,7 +718,7 @@ class CloudDataCatalogDeleteEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -810,7 +810,7 @@ class CloudDataCatalogDeleteEntryGroupOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -909,7 +909,7 @@ class CloudDataCatalogDeleteTagOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1007,7 +1007,7 @@ class CloudDataCatalogDeleteTagTemplateOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1107,7 +1107,7 @@ class CloudDataCatalogDeleteTagTemplateFieldOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1203,7 +1203,7 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1299,7 +1299,7 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1387,7 +1387,7 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1487,7 +1487,7 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> list:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1578,7 +1578,7 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1676,7 +1676,7 @@ class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1796,7 +1796,7 @@ class CloudDataCatalogSearchCatalogOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> list:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -1906,7 +1906,7 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -2020,7 +2020,7 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -2132,7 +2132,7 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
@@ -2254,7 +2254,7 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py
index 6bdd907..f644008 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -227,14 +227,14 @@ class DataflowCreateJavaJobOperator(BaseOperator):
dataflow_options.update(self.options)
is_running = False
if self.check_if_running != CheckJobRunning.IgnoreJob:
- is_running = self.hook.is_job_dataflow_running(
+ is_running = self.hook.is_job_dataflow_running( # type: ignore[attr-defined]
name=self.job_name,
variables=dataflow_options,
project_id=self.project_id,
location=self.location,
)
while is_running and self.check_if_running == CheckJobRunning.WaitForRun:
- is_running = self.hook.is_job_dataflow_running(
+ is_running = self.hook.is_job_dataflow_running( # type: ignore[attr-defined]
name=self.job_name,
variables=dataflow_options,
project_id=self.project_id,
@@ -253,7 +253,7 @@ class DataflowCreateJavaJobOperator(BaseOperator):
def set_current_job_id(job_id):
self.job_id = job_id
- self.hook.start_java_dataflow(
+ self.hook.start_java_dataflow( # type: ignore[attr-defined]
job_name=self.job_name,
variables=dataflow_options,
jar=self.jar,
@@ -419,7 +419,7 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
self.environment = environment
- def execute(self, context):
+ def execute(self, context) -> dict:
self.hook = DataflowHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -585,7 +585,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
def set_current_job_id(job_id):
self.job_id = job_id
- self.hook.start_python_dataflow(
+ self.hook.start_python_dataflow( # type: ignore[attr-defined]
job_name=self.job_name,
variables=formatted_options,
dataflow=self.py_file,
diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py
index 058e81f..c02fb94 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -90,7 +90,7 @@ class CloudDataFusionRestartInstanceOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -167,7 +167,7 @@ class CloudDataFusionDeleteInstanceOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -250,7 +250,7 @@ class CloudDataFusionCreateInstanceOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -360,7 +360,7 @@ class CloudDataFusionUpdateInstanceOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -439,7 +439,7 @@ class CloudDataFusionGetInstanceOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -529,7 +529,7 @@ class CloudDataFusionCreatePipelineOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -626,7 +626,7 @@ class CloudDataFusionDeletePipelineOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -724,7 +724,7 @@ class CloudDataFusionListPipelinesOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -832,7 +832,7 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -934,7 +934,7 @@ class CloudDataFusionStopPipelineOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataFusionHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
diff --git a/airflow/providers/google/cloud/operators/dataprep.py b/airflow/providers/google/cloud/operators/dataprep.py
index 8d08c5a..e53ba84 100644
--- a/airflow/providers/google/cloud/operators/dataprep.py
+++ b/airflow/providers/google/cloud/operators/dataprep.py
@@ -19,8 +19,6 @@
This module contains a Google Dataprep operator.
"""
-from typing import Dict
-
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.dataprep import GoogleDataprepHook
from airflow.utils.decorators import apply_defaults
@@ -47,7 +45,7 @@ class DataprepGetJobsForJobGroupOperator(BaseOperator):
self.dataprep_conn_id = (dataprep_conn_id,)
self.job_id = job_id
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
self.log.info("Fetching data for job with id: %d ...", self.job_id)
hook = GoogleDataprepHook(
dataprep_conn_id="dataprep_default",
@@ -92,7 +90,7 @@ class DataprepGetJobGroupOperator(BaseOperator):
self.embed = embed
self.include_deleted = include_deleted
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> dict:
self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
hook = GoogleDataprepHook(dataprep_conn_id=self.dataprep_conn_id)
response = hook.get_job_group(
@@ -121,7 +119,7 @@ class DataprepRunJobGroupOperator(BaseOperator):
self.body_request = body_request
self.dataprep_conn_id = dataprep_conn_id
- def execute(self, context: None):
+ def execute(self, context: None) -> dict:
self.log.info("Creating a job...")
hook = GoogleDataprepHook(dataprep_conn_id=self.dataprep_conn_id)
response = hook.run_job_group(body_request=self.body_request)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 03efe00..c6fcdc0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -229,7 +229,7 @@ class ClusterGenerator:
if self.single_node and self.num_preemptible_workers > 0:
raise ValueError("Single node cannot have preemptible workers.")
- def _get_init_action_timeout(self):
+ def _get_init_action_timeout(self) -> dict:
match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
if match:
val = float(match.group(1))
@@ -553,7 +553,7 @@ class DataprocCreateClusterOperator(BaseOperator):
self.log.info("Deleting the cluster")
hook.delete_cluster(region=self.region, cluster_name=self.cluster_name, project_id=self.project_id)
- def _get_cluster(self, hook: DataprocHook):
+ def _get_cluster(self, hook: DataprocHook) -> Cluster:
return hook.get_cluster(
project_id=self.project_id,
region=self.region,
@@ -601,7 +601,7 @@ class DataprocCreateClusterOperator(BaseOperator):
cluster = self._get_cluster(hook)
return cluster
- def execute(self, context):
+ def execute(self, context) -> dict:
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
try:
@@ -711,7 +711,7 @@ class DataprocScaleClusterOperator(BaseOperator):
stacklevel=1,
)
- def _build_scale_cluster_data(self):
+ def _build_scale_cluster_data(self) -> dict:
scale_data = {
'config': {
'worker_config': {'num_instances': self.num_workers},
@@ -749,7 +749,7 @@ class DataprocScaleClusterOperator(BaseOperator):
return {'seconds': timeout}
- def execute(self, context):
+ def execute(self, context) -> None:
"""
Scale, up or down, a cluster on Google Cloud Dataproc.
"""
@@ -839,7 +839,7 @@ class DataprocDeleteClusterOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
self.log.info("Deleting cluster: %s", self.cluster_name)
operation = hook.delete_cluster(
@@ -974,7 +974,7 @@ class DataprocJobBaseOperator(BaseOperator):
self.job_template.add_jar_file_uris(self.dataproc_jars)
self.job_template.add_labels(self.labels)
- def _generate_job_template(self):
+ def _generate_job_template(self) -> str:
if self.job_template:
job = self.job_template.build()
return job['job']
@@ -999,7 +999,7 @@ class DataprocJobBaseOperator(BaseOperator):
else:
raise AirflowException("Create a job template before")
- def on_kill(self):
+ def on_kill(self) -> None:
"""
Callback called when the operator is killed.
Cancel any running job.
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 91e50f4..30f4548 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -113,7 +113,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
- def execute(self, context):
+ def execute(self, context) -> dict:
self.log.info('Exporting data to Cloud Storage bucket %s', self.bucket)
if self.overwrite_existing and self.namespace:
@@ -305,7 +305,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> list:
hook = DatastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -373,7 +373,7 @@ class CloudDatastoreBeginTransactionOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> str:
hook = DatastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -441,7 +441,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = DatastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -509,7 +509,7 @@ class CloudDatastoreRollbackOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = DatastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -576,7 +576,7 @@ class CloudDatastoreRunQueryOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = DatastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -693,7 +693,7 @@ class CloudDatastoreDeleteOperationOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = DatastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/dlp.py b/airflow/providers/google/cloud/operators/dlp.py
index b2623a5..5d28d66 100644
--- a/airflow/providers/google/cloud/operators/dlp.py
+++ b/airflow/providers/google/cloud/operators/dlp.py
@@ -112,7 +112,7 @@ class CloudDLPCancelDLPJobOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDLPHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -749,7 +749,7 @@ class CloudDLPDeidentifyContentOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = CloudDLPHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -839,7 +839,7 @@ class CloudDLPDeleteDeidentifyTemplateOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDLPHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -923,7 +923,7 @@ class CloudDLPDeleteDLPJobOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDLPHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -1011,7 +1011,7 @@ class CloudDLPDeleteInspectTemplateOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudDLPHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/functions.py b/airflow/providers/google/cloud/operators/functions.py
index a2a2043..7066e1b 100644
--- a/airflow/providers/google/cloud/operators/functions.py
+++ b/airflow/providers/google/cloud/operators/functions.py
@@ -180,24 +180,24 @@ class CloudFunctionDeployFunctionOperator(BaseOperator):
self._validate_inputs()
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.location:
raise AirflowException("The required parameter 'location' is missing")
if not self.body:
raise AirflowException("The required parameter 'body' is missing")
self.zip_path_preprocessor.preprocess_body()
- def _validate_all_body_fields(self):
+ def _validate_all_body_fields(self) -> None:
if self._field_validator:
self._field_validator.validate(self.body)
- def _create_new_function(self, hook):
+ def _create_new_function(self, hook) -> None:
hook.create_new_function(project_id=self.project_id, location=self.location, body=self.body)
- def _update_function(self, hook):
+ def _update_function(self, hook) -> None:
hook.update_function(self.body['name'], self.body, self.body.keys())
- def _check_if_function_exists(self, hook):
+ def _check_if_function_exists(self, hook) -> bool:
name = self.body.get('name')
if not name:
raise GcpFieldValidationException(
@@ -217,7 +217,7 @@ class CloudFunctionDeployFunctionOperator(BaseOperator):
project_id=self.project_id, location=self.location, zip_path=self.zip_path
)
- def _set_airflow_version_label(self):
+ def _set_airflow_version_label(self) -> None:
if 'labels' not in self.body.keys():
self.body['labels'] = {}
self.body['labels'].update({'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')})
@@ -274,10 +274,10 @@ class ZipPathPreprocessor:
self.zip_path = zip_path
@staticmethod
- def _is_present_and_empty(dictionary, field):
+ def _is_present_and_empty(dictionary, field) -> bool:
return field in dictionary and not dictionary[field]
- def _verify_upload_url_and_no_zip_path(self):
+ def _verify_upload_url_and_no_zip_path(self) -> None:
if self._is_present_and_empty(self.body, GCF_SOURCE_UPLOAD_URL):
if not self.zip_path:
raise AirflowException(
@@ -286,7 +286,7 @@ class ZipPathPreprocessor:
"when '{url}' is present and empty.".format(url=GCF_SOURCE_UPLOAD_URL, path=GCF_ZIP_PATH)
)
- def _verify_upload_url_and_zip_path(self):
+ def _verify_upload_url_and_zip_path(self) -> None:
if GCF_SOURCE_UPLOAD_URL in self.body and self.zip_path:
if not self.body[GCF_SOURCE_UPLOAD_URL]:
self.upload_function = True
@@ -296,7 +296,7 @@ class ZipPathPreprocessor:
"allowed. Found both.".format(GCF_SOURCE_UPLOAD_URL, GCF_ZIP_PATH)
)
- def _verify_archive_url_and_zip_path(self):
+ def _verify_archive_url_and_zip_path(self) -> None:
if GCF_SOURCE_ARCHIVE_URL in self.body and self.zip_path:
raise AirflowException(
"Only one of '{}' in body or '{}' argument "
@@ -313,7 +313,7 @@ class ZipPathPreprocessor:
raise AirflowException('validate() method has to be invoked before ' 'should_upload_function')
return self.upload_function
- def preprocess_body(self):
+ def preprocess_body(self) -> None:
"""
Modifies sourceUploadUrl body field in special way when zip_path
is not empty.
@@ -381,7 +381,7 @@ class CloudFunctionDeleteFunctionOperator(BaseOperator):
self._validate_inputs()
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.name:
raise AttributeError('Empty parameter: name')
else:
diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py
index fea3eda..a4c38b0 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -151,7 +151,7 @@ class GCSCreateBucketOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = GCSHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -258,7 +258,7 @@ class GCSListObjectsOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> list:
hook = GCSHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
@@ -445,7 +445,7 @@ class GCSBucketCreateAclEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = GCSHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -544,7 +544,7 @@ class GCSObjectCreateAclEntryOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = GCSHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -624,7 +624,7 @@ class GCSFileTransformOperator(BaseOperator):
self.output_encoding = sys.getdefaultencoding()
self.impersonation_chain = impersonation_chain
- def execute(self, context: Dict):
+ def execute(self, context: dict) -> None:
hook = GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
with NamedTemporaryFile() as source_file, NamedTemporaryFile() as destination_file:
@@ -707,7 +707,7 @@ class GCSDeleteBucketOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
hook.delete_bucket(bucket_name=self.bucket_name, force=self.force)
@@ -805,7 +805,7 @@ class GCSSynchronizeBucketsOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = GCSHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index be24fda..5212fba 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -111,12 +111,12 @@ class GKEDeleteClusterOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
self._check_input()
- def _check_input(self):
+ def _check_input(self) -> None:
if not all([self.project_id, self.name, self.location]):
self.log.error('One of (project_id, name, location) is missing or incorrect')
raise AirflowException('Operator has incorrect or missing input.')
- def execute(self, context):
+ def execute(self, context) -> Optional[str]:
hook = GKEHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
@@ -214,7 +214,7 @@ class GKECreateClusterOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
self._check_input()
- def _check_input(self):
+ def _check_input(self) -> None:
if not all([self.project_id, self.location, self.body]) or not (
(isinstance(self.body, dict) and "name" in self.body and "initial_node_count" in self.body)
or (getattr(self.body, "name", None) and getattr(self.body, "initial_node_count", None))
@@ -225,7 +225,7 @@ class GKECreateClusterOperator(BaseOperator):
)
raise AirflowException("Operator has incorrect or missing input.")
- def execute(self, context):
+ def execute(self, context) -> str:
hook = GKEHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
@@ -299,7 +299,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
"called `google_cloud_default`.",
)
- def execute(self, context):
+ def execute(self, context) -> Optional[str]:
hook = GoogleBaseHook(gcp_conn_id=self.gcp_conn_id)
self.project_id = self.project_id or hook.project_id
diff --git a/airflow/providers/google/cloud/operators/life_sciences.py b/airflow/providers/google/cloud/operators/life_sciences.py
index c4d9cd7..0c82927 100644
--- a/airflow/providers/google/cloud/operators/life_sciences.py
+++ b/airflow/providers/google/cloud/operators/life_sciences.py
@@ -83,13 +83,13 @@ class LifeSciencesRunPipelineOperator(BaseOperator):
self._validate_inputs()
self.impersonation_chain = impersonation_chain
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if not self.body:
raise AirflowException("The required parameter 'body' is missing")
if not self.location:
raise AirflowException("The required parameter 'location' is missing")
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = LifeSciencesHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py
index 544b615..e539e6d 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -172,7 +172,7 @@ class PubSubCreateTopicOperator(BaseOperator):
self.metadata = metadata
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -421,7 +421,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
self.metadata = metadata
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> str:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -566,7 +566,7 @@ class PubSubDeleteTopicOperator(BaseOperator):
self.metadata = metadata
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -699,7 +699,7 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
self.metadata = metadata
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -820,7 +820,7 @@ class PubSubPublishMessageOperator(BaseOperator):
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> None:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -919,7 +919,7 @@ class PubSubPullOperator(BaseOperator):
self.messages_callback = messages_callback
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> list:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
@@ -950,7 +950,7 @@ class PubSubPullOperator(BaseOperator):
self,
pulled_messages: List[ReceivedMessage],
context: Dict[str, Any], # pylint: disable=unused-argument
- ):
+ ) -> list:
"""
This method can be overridden by subclasses or by `messages_callback` constructor argument.
This default implementation converts `ReceivedMessage` objects into JSON-serializable dicts.
diff --git a/airflow/providers/google/cloud/operators/spanner.py b/airflow/providers/google/cloud/operators/spanner.py
index 9f14cca..12019bd 100644
--- a/airflow/providers/google/cloud/operators/spanner.py
+++ b/airflow/providers/google/cloud/operators/spanner.py
@@ -98,13 +98,13 @@ class SpannerDeployInstanceOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance_id:
raise AirflowException("The required parameter 'instance_id' " "is empty or None")
- def execute(self, context):
+ def execute(self, context) -> None:
hook = SpannerHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -177,13 +177,13 @@ class SpannerDeleteInstanceOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance_id:
raise AirflowException("The required parameter 'instance_id' " "is empty or None")
- def execute(self, context):
+ def execute(self, context) -> Optional[bool]:
hook = SpannerHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -263,7 +263,7 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance_id:
@@ -297,7 +297,7 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
)
@staticmethod
- def sanitize_queries(queries):
+ def sanitize_queries(queries: List[str]) -> None:
"""
Drops empty query in queries.
@@ -373,7 +373,7 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance_id:
@@ -381,7 +381,7 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
if not self.database_id:
raise AirflowException("The required parameter 'database_id' is empty" " or None")
- def execute(self, context):
+ def execute(self, context) -> Optional[bool]:
hook = SpannerHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -480,7 +480,7 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance_id:
@@ -490,7 +490,7 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
if not self.ddl_statements:
raise AirflowException("The required parameter 'ddl_statements' is empty" " or None")
- def execute(self, context):
+ def execute(self, context) -> None:
hook = SpannerHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -570,7 +570,7 @@ class SpannerDeleteDatabaseInstanceOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.project_id == '':
raise AirflowException("The required parameter 'project_id' is empty")
if not self.instance_id:
@@ -578,7 +578,7 @@ class SpannerDeleteDatabaseInstanceOperator(BaseOperator):
if not self.database_id:
raise AirflowException("The required parameter 'database_id' is empty" " or None")
- def execute(self, context):
+ def execute(self, context) -> bool:
hook = SpannerHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/speech_to_text.py b/airflow/providers/google/cloud/operators/speech_to_text.py
index 148cd28..b8c7933 100644
--- a/airflow/providers/google/cloud/operators/speech_to_text.py
+++ b/airflow/providers/google/cloud/operators/speech_to_text.py
@@ -102,7 +102,7 @@ class CloudSpeechToTextRecognizeSpeechOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
if self.audio == "":
raise AirflowException("The required parameter 'audio' is empty")
if self.config == "":
diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py
index 517fe5a..dc86466 100644
--- a/airflow/providers/google/cloud/operators/stackdriver.py
+++ b/airflow/providers/google/cloud/operators/stackdriver.py
@@ -108,7 +108,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.format_ = format_
self.filter_ = filter_
@@ -212,7 +212,7 @@ class StackdriverEnableAlertPoliciesOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.gcp_conn_id = gcp_conn_id
self.project_id = project_id
@@ -303,7 +303,7 @@ class StackdriverDisableAlertPoliciesOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.gcp_conn_id = gcp_conn_id
self.project_id = project_id
@@ -396,7 +396,7 @@ class StackdriverUpsertAlertOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.alerts = alerts
self.retry = retry
@@ -485,7 +485,7 @@ class StackdriverDeleteAlertOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.name = name
self.retry = retry
@@ -597,7 +597,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.format_ = format_
self.filter_ = filter_
@@ -701,7 +701,7 @@ class StackdriverEnableNotificationChannelsOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.filter_ = filter_
self.retry = retry
@@ -794,7 +794,7 @@ class StackdriverDisableNotificationChannelsOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.filter_ = filter_
self.retry = retry
@@ -889,7 +889,7 @@ class StackdriverUpsertNotificationChannelOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.channels = channels
self.retry = retry
@@ -980,7 +980,7 @@ class StackdriverDeleteNotificationChannelOperator(BaseOperator):
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
- ):
+ ) -> None:
super().__init__(**kwargs)
self.name = name
self.retry = retry
diff --git a/airflow/providers/google/cloud/operators/text_to_speech.py b/airflow/providers/google/cloud/operators/text_to_speech.py
index 21e3a7a..16dfe1b 100644
--- a/airflow/providers/google/cloud/operators/text_to_speech.py
+++ b/airflow/providers/google/cloud/operators/text_to_speech.py
@@ -119,7 +119,7 @@ class CloudTextToSpeechSynthesizeOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)
- def _validate_inputs(self):
+ def _validate_inputs(self) -> None:
for parameter in [
"input_data",
"voice",
@@ -130,7 +130,7 @@ class CloudTextToSpeechSynthesizeOperator(BaseOperator):
if getattr(self, parameter) == "":
raise AirflowException("The required parameter '{}' is empty".format(parameter))
- def execute(self, context):
+ def execute(self, context) -> None:
hook = CloudTextToSpeechHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/translate.py b/airflow/providers/google/cloud/operators/translate.py
index c931c6f..bfc33be 100644
--- a/airflow/providers/google/cloud/operators/translate.py
+++ b/airflow/providers/google/cloud/operators/translate.py
@@ -118,7 +118,7 @@ class CloudTranslateTextOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> dict:
hook = CloudTranslateHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
diff --git a/airflow/providers/google/cloud/operators/translate_speech.py b/airflow/providers/google/cloud/operators/translate_speech.py
index 8f8380b..ae75681 100644
--- a/airflow/providers/google/cloud/operators/translate_speech.py
+++ b/airflow/providers/google/cloud/operators/translate_speech.py
@@ -146,7 +146,7 @@ class CloudTranslateSpeechOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context):
+ def execute(self, context) -> dict:
speech_to_text_hook = CloudSpeechToTextHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,