You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by vi...@apache.org on 2023/09/28 14:48:30 UTC
[airflow] branch main updated: Refactor usage of str() in providers (#34320)
This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ebf4220c9 Refactor usage of str() in providers (#34320)
7ebf4220c9 is described below
commit 7ebf4220c9abd001f1fa23c95f882efddd5afbac
Author: Miroslav Šedivý <67...@users.noreply.github.com>
AuthorDate: Thu Sep 28 14:48:20 2023 +0000
Refactor usage of str() in providers (#34320)
---
.../alibaba/cloud/log/oss_task_handler.py | 6 ++--
airflow/providers/amazon/aws/hooks/base_aws.py | 2 +-
airflow/providers/amazon/aws/hooks/s3.py | 9 +++---
.../providers/apache/spark/hooks/spark_submit.py | 2 +-
.../celery/executors/celery_executor_utils.py | 4 +--
.../kubernetes/executors/kubernetes_executor.py | 4 +--
.../cncf/kubernetes/python_kubernetes_script.py | 4 +--
.../providers/cncf/kubernetes/utils/delete_from.py | 2 +-
.../providers/cncf/kubernetes/utils/pod_manager.py | 35 +++++++++++-----------
airflow/providers/common/sql/operators/sql.py | 4 +--
.../databricks/operators/databricks_sql.py | 6 ++--
.../databricks/sensors/databricks_partition.py | 2 +-
airflow/providers/dbt/cloud/operators/dbt.py | 8 ++---
airflow/providers/docker/operators/docker_swarm.py | 2 +-
airflow/providers/google/cloud/hooks/cloud_sql.py | 2 +-
airflow/providers/google/cloud/hooks/dataflow.py | 4 +--
.../providers/google/cloud/operators/dataflow.py | 2 +-
.../providers/google/cloud/operators/dataproc.py | 6 ++--
.../providers/google/cloud/sensors/bigquery_dts.py | 2 +-
.../google/cloud/transfers/facebook_ads_to_gcs.py | 2 +-
.../providers/google/leveldb/operators/leveldb.py | 2 +-
airflow/providers/grpc/hooks/grpc.py | 2 +-
airflow/providers/http/hooks/http.py | 6 ++--
.../microsoft/azure/log/wasb_task_handler.py | 2 +-
.../microsoft/azure/transfers/sftp_to_wasb.py | 2 +-
airflow/providers/oracle/hooks/oracle.py | 8 ++---
.../amazon/aws/operators/test_emr_serverless.py | 2 +-
.../microsoft/azure/operators/test_asb.py | 2 +-
.../system/providers/amazon/aws/utils/__init__.py | 2 +-
.../providers/docker/example_docker_copy_data.py | 4 +--
30 files changed, 70 insertions(+), 70 deletions(-)
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 602183a992..e244c54e42 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -189,7 +189,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
if append and self.oss_log_exists(oss_remote_log_location):
head = self.hook.head_key(self.bucket_name, oss_remote_log_location)
pos = head.content_length
- self.log.info("log write pos is: %s", str(pos))
+ self.log.info("log write pos is: %s", pos)
try:
self.log.info("writing remote log: %s", oss_remote_log_location)
self.hook.append_string(self.bucket_name, log, oss_remote_log_location, pos)
@@ -197,8 +197,8 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
self.log.exception(
"Could not write logs to %s, log write pos is: %s, Append is %s",
oss_remote_log_location,
- str(pos),
- str(append),
+ pos,
+ append,
)
return False
return True
diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py
index 742ec52932..a990a23168 100644
--- a/airflow/providers/amazon/aws/hooks/base_aws.py
+++ b/airflow/providers/amazon/aws/hooks/base_aws.py
@@ -839,7 +839,7 @@ class AwsGenericHook(BaseHook, Generic[BaseAwsConnection]):
return True, ", ".join(f"{k}={v!r}" for k, v in conn_info.items())
except Exception as e:
- return False, str(f"{type(e).__name__!r} error occurred while testing connection: {e}")
+ return False, f"{type(e).__name__!r} error occurred while testing connection: {e}"
@cached_property
def waiter_path(self) -> os.PathLike[str] | None:
diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py
index 0d3b3003bc..9eeeaf876b 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -350,7 +350,8 @@ class S3Hook(AwsBaseHook):
:param delimiter: the delimiter marks key hierarchy.
:return: False if the prefix does not exist in the bucket and True if it does.
"""
- prefix = prefix + delimiter if prefix[-1] != delimiter else prefix
+ if not prefix.endswith(delimiter):
+ prefix += delimiter
prefix_split = re.split(rf"(\w+[{delimiter}])$", prefix, 1)
previous_level = prefix_split[0]
plist = self.list_prefixes(bucket_name, previous_level, delimiter)
@@ -544,7 +545,8 @@ class S3Hook(AwsBaseHook):
:param delimiter: the delimiter marks key hierarchy.
:return: False if the prefix does not exist in the bucket and True if it does.
"""
- prefix = prefix + delimiter if prefix[-1] != delimiter else prefix
+ if not prefix.endswith(delimiter):
+ prefix += delimiter
prefix_split = re.split(rf"(\w+[{delimiter}])$", prefix, 1)
previous_level = prefix_split[0]
plist = await self.list_prefixes_async(client, bucket_name, previous_level, delimiter)
@@ -576,8 +578,7 @@ class S3Hook(AwsBaseHook):
response = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter=delimiter)
async for page in response:
if "Contents" in page:
- _temp = [k for k in page["Contents"] if isinstance(k.get("Size", None), (int, float))]
- keys = keys + _temp
+ keys.extend(k for k in page["Contents"] if isinstance(k.get("Size"), (int, float)))
return keys
@staticmethod
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py
index f95a9d0341..b6b5b93968 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -268,7 +268,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
connection_cmd += ["--master", self._connection["master"]]
for key in self._conf:
- connection_cmd += ["--conf", f"{key}={str(self._conf[key])}"]
+ connection_cmd += ["--conf", f"{key}={self._conf[key]}"]
if self._env_vars and (self._is_kubernetes or self._is_yarn):
if self._is_yarn:
tmpl = "spark.yarn.appMasterEnv.{}={}"
diff --git a/airflow/providers/celery/executors/celery_executor_utils.py b/airflow/providers/celery/executors/celery_executor_utils.py
index a3b01a3bb5..90cbc2f57c 100644
--- a/airflow/providers/celery/executors/celery_executor_utils.py
+++ b/airflow/providers/celery/executors/celery_executor_utils.py
@@ -154,8 +154,8 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: str | None =
setproctitle(f"airflow task supervisor: {command_to_exec}")
args.func(args)
ret = 0
- except Exception as e:
- log.exception("[%s] Failed to execute task %s.", celery_task_id, str(e))
+ except Exception:
+ log.exception("[%s] Failed to execute task.", celery_task_id)
ret = 1
finally:
Sentry.flush()
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 3780717685..40ac4a2231 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -438,10 +438,10 @@ class KubernetesExecutor(BaseExecutor):
if self.kube_config.delete_worker_pods:
if state != TaskInstanceState.FAILED or self.kube_config.delete_worker_pods_on_failure:
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
- self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
+ self.log.info("Deleted pod: %s in namespace %s", key, namespace)
else:
self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, namespace=namespace)
- self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
+ self.log.info("Patched pod %s in namespace %s to mark it as done", key, namespace)
try:
self.running.remove(key)
diff --git a/airflow/providers/cncf/kubernetes/python_kubernetes_script.py b/airflow/providers/cncf/kubernetes/python_kubernetes_script.py
index becd3633bc..01fbfd7561 100644
--- a/airflow/providers/cncf/kubernetes/python_kubernetes_script.py
+++ b/airflow/providers/cncf/kubernetes/python_kubernetes_script.py
@@ -31,9 +31,9 @@ def _balance_parens(after_decorator):
while num_paren:
current = after_decorator.popleft()
if current == "(":
- num_paren = num_paren + 1
+ num_paren += 1
elif current == ")":
- num_paren = num_paren - 1
+ num_paren -= 1
return "".join(after_decorator)
diff --git a/airflow/providers/cncf/kubernetes/utils/delete_from.py b/airflow/providers/cncf/kubernetes/utils/delete_from.py
index 663242fad1..917c24edf1 100644
--- a/airflow/providers/cncf/kubernetes/utils/delete_from.py
+++ b/airflow/providers/cncf/kubernetes/utils/delete_from.py
@@ -138,7 +138,7 @@ def _delete_from_yaml_single_item(
else:
resp = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs)
if verbose:
- print(f"{kind} deleted. status='{str(resp.status)}'")
+ print(f"{kind} deleted. status='{resp.status}'")
return resp
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 7b0d8d94b1..cfb262cdec 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -729,23 +729,24 @@ class PodManager(LoggingMixin):
self._exec_pod_command(resp, "kill -s SIGINT 1")
def _exec_pod_command(self, resp, command: str) -> str | None:
- res = None
- if resp.is_open():
- self.log.info("Running command... %s\n", command)
- resp.write_stdin(command + "\n")
- while resp.is_open():
- resp.update(timeout=1)
- while resp.peek_stdout():
- res = res + resp.read_stdout() if res else resp.read_stdout()
- error_res = None
- while resp.peek_stderr():
- error_res = error_res + resp.read_stderr() if error_res else resp.read_stderr()
- if error_res:
- self.log.info("stderr from command: %s", error_res)
- break
- if res:
- return res
- return res
+ res = ""
+ if not resp.is_open():
+ return None
+ self.log.info("Running command... %s", command)
+ resp.write_stdin(f"{command}\n")
+ while resp.is_open():
+ resp.update(timeout=1)
+ while resp.peek_stdout():
+ res += resp.read_stdout()
+ error_res = ""
+ while resp.peek_stderr():
+ error_res += resp.read_stderr()
+ if error_res:
+ self.log.info("stderr from command: %s", error_res)
+ break
+ if res:
+ return res
+ return None
class OnFinishAction(str, enum.Enum):
diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py
index 2dba7b12cb..bfccd9c1fc 100644
--- a/airflow/providers/common/sql/operators/sql.py
+++ b/airflow/providers/common/sql/operators/sql.py
@@ -945,8 +945,8 @@ class SQLIntervalCheckOperator(BaseSQLOperator):
sqlexp = ", ".join(self.metrics_sorted)
sqlt = f"SELECT {sqlexp} FROM {table} WHERE {date_filter_column}="
- self.sql1 = sqlt + "'{{ ds }}'"
- self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'"
+ self.sql1 = f"{sqlt}'{{{{ ds }}}}'"
+ self.sql2 = f"{sqlt}'{{{{ macros.ds_add(ds, {self.days_back}) }}}}'"
def execute(self, context: Context):
hook = self.get_db_hook()
diff --git a/airflow/providers/databricks/operators/databricks_sql.py b/airflow/providers/databricks/operators/databricks_sql.py
index 3b86fc8f3a..b40a1da115 100644
--- a/airflow/providers/databricks/operators/databricks_sql.py
+++ b/airflow/providers/databricks/operators/databricks_sql.py
@@ -339,13 +339,11 @@ class DatabricksCopyIntoOperator(BaseOperator):
elif isinstance(self._validate, int):
if self._validate < 0:
raise AirflowException(
- "Number of rows for validation should be positive, got: " + str(self._validate)
+ f"Number of rows for validation should be positive, got: {self._validate}"
)
validation = f"VALIDATE {self._validate} ROWS\n"
else:
- raise AirflowException(
- "Incorrect data type for validate parameter: " + str(type(self._validate))
- )
+ raise AirflowException(f"Incorrect data type for validate parameter: {type(self._validate)}")
# TODO: think on how to make sure that table_name and expression_list aren't used for SQL injection
sql = f"""COPY INTO {self._table_name}{storage_cred}
FROM {location}
diff --git a/airflow/providers/databricks/sensors/databricks_partition.py b/airflow/providers/databricks/sensors/databricks_partition.py
index 1aea220078..9c74891018 100644
--- a/airflow/providers/databricks/sensors/databricks_partition.py
+++ b/airflow/providers/databricks/sensors/databricks_partition.py
@@ -140,7 +140,7 @@ class DatabricksPartitionSensor(BaseSensorOperator):
if self.table_name.split(".")[0] == "delta":
_fully_qualified_table_name = self.table_name
else:
- _fully_qualified_table_name = str(self.catalog + "." + self.schema + "." + self.table_name)
+ _fully_qualified_table_name = f"{self.catalog}.{self.schema}.{self.table_name}"
self.log.debug("Table name generated from arguments: %s", _fully_qualified_table_name)
_joiner_val = " AND "
_prefix = f"SELECT 1 FROM {_fully_qualified_table_name} WHERE"
diff --git a/airflow/providers/dbt/cloud/operators/dbt.py b/airflow/providers/dbt/cloud/operators/dbt.py
index c0b37c7fdd..2b37fa144a 100644
--- a/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/airflow/providers/dbt/cloud/operators/dbt.py
@@ -142,7 +142,7 @@ class DbtCloudRunJobOperator(BaseOperator):
if self.wait_for_termination and isinstance(self.run_id, int):
if self.deferrable is False:
- self.log.info("Waiting for job run %s to terminate.", str(self.run_id))
+ self.log.info("Waiting for job run %s to terminate.", self.run_id)
if self.hook.wait_for_job_run_status(
run_id=self.run_id,
@@ -151,7 +151,7 @@ class DbtCloudRunJobOperator(BaseOperator):
check_interval=self.check_interval,
timeout=self.timeout,
):
- self.log.info("Job run %s has completed successfully.", str(self.run_id))
+ self.log.info("Job run %s has completed successfully.", self.run_id)
else:
raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")
@@ -173,7 +173,7 @@ class DbtCloudRunJobOperator(BaseOperator):
method_name="execute_complete",
)
elif job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
- self.log.info("Job run %s has completed successfully.", str(self.run_id))
+ self.log.info("Job run %s has completed successfully.", self.run_id)
return self.run_id
elif job_run_status in (
DbtCloudJobRunStatus.CANCELLED.value,
@@ -211,7 +211,7 @@ class DbtCloudRunJobOperator(BaseOperator):
check_interval=self.check_interval,
timeout=self.timeout,
):
- self.log.info("Job run %s has been cancelled successfully.", str(self.run_id))
+ self.log.info("Job run %s has been cancelled successfully.", self.run_id)
@cached_property
def hook(self):
diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index 9a44a31749..7a8a419921 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -143,7 +143,7 @@ class DockerSwarmOperator(DockerOperator):
)
if self.service is None:
raise Exception("Service should be set here")
- self.log.info("Service started: %s", str(self.service))
+ self.log.info("Service started: %s", self.service)
# wait for the service to start the task
while not self.cli.tasks(filters={"service": self.service["ID"]}):
diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py b/airflow/providers/google/cloud/hooks/cloud_sql.py
index 059786f551..6e8f7774fc 100644
--- a/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -954,7 +954,7 @@ class CloudSQLDatabaseHook(BaseHook):
def _get_sqlproxy_instance_specification(self) -> str:
instance_specification = self._get_instance_socket_name()
if self.sql_proxy_use_tcp:
- instance_specification += "=tcp:" + str(self.sql_proxy_tcp_port)
+ instance_specification += f"=tcp:{self.sql_proxy_tcp_port}"
return instance_specification
def create_connection(self) -> Connection:
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 3b0f3a2a8c..111ef3a836 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -426,7 +426,7 @@ class _DataflowJobsController(LoggingMixin):
if current_state in DataflowJobStatus.AWAITING_STATES:
return self._wait_until_finished is False
- self.log.debug("Current job: %s", str(job))
+ self.log.debug("Current job: %s", job)
raise Exception(
f"Google Cloud Dataflow job {job['name']} is in an unexpected terminal state: {current_state}, "
f"expected terminal state: {self._expected_terminal_state}"
@@ -896,7 +896,7 @@ class DataflowHook(GoogleBaseHook):
)
if append_job_name:
- safe_job_name = base_job_name + "-" + str(uuid.uuid4())[:8]
+ safe_job_name = f"{base_job_name}-{uuid.uuid4()!s:.8}"
else:
safe_job_name = base_job_name
diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py
index 059cbb69fd..59b2715886 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -899,7 +899,7 @@ class DataflowStartFlexTemplateOperator(GoogleCloudBaseOperator):
job_body = self.body.get("launch_parameter") or self.body.get("launchParameter")
job_name = job_body.get("jobName")
if job_name:
- job_name += f"-{str(uuid.uuid4())[:8]}"
+ job_name += f"-{uuid.uuid4()!s:.8}"
job_body["jobName"] = job_name
self.log.info("Job name was changed to %s", job_name)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index e8957b5431..3d41fdd1be 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -615,7 +615,7 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator):
if time_left < 0:
raise AirflowException(f"Cluster {self.cluster_name} is still DELETING state, aborting")
time.sleep(time_to_sleep)
- time_left = time_left - time_to_sleep
+ time_left -= time_to_sleep
try:
self._get_cluster(hook)
except NotFound:
@@ -630,7 +630,7 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator):
if time_left < 0:
raise AirflowException(f"Cluster {self.cluster_name} is still CREATING state, aborting")
time.sleep(time_to_sleep)
- time_left = time_left - time_to_sleep
+ time_left -= time_to_sleep
cluster = self._get_cluster(hook)
return cluster
@@ -1599,7 +1599,7 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
@staticmethod
def _generate_temp_filename(filename):
- return f"{time:%Y%m%d%H%M%S}_{str(uuid.uuid4())[:8]}_{ntpath.basename(filename)}"
+ return f"{time:%Y%m%d%H%M%S}_{uuid.uuid4()!s:.8}_{ntpath.basename(filename)}"
def _upload_file_temp(self, bucket, local_file):
"""Upload a local file to a Google Cloud Storage bucket."""
diff --git a/airflow/providers/google/cloud/sensors/bigquery_dts.py b/airflow/providers/google/cloud/sensors/bigquery_dts.py
index b4926b3b95..55cc6dc4ae 100644
--- a/airflow/providers/google/cloud/sensors/bigquery_dts.py
+++ b/airflow/providers/google/cloud/sensors/bigquery_dts.py
@@ -137,7 +137,7 @@ class BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
timeout=self.request_timeout,
metadata=self.metadata,
)
- self.log.info("Status of %s run: %s", self.run_id, str(run.state))
+ self.log.info("Status of %s run: %s", self.run_id, run.state)
if run.state in (TransferState.FAILED, TransferState.CANCELLED):
# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1
diff --git a/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py b/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
index 758bd818ca..075066c600 100644
--- a/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
@@ -199,7 +199,7 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
else:
message = (
"FlushAction not found in the data. Please check the FlushAction in "
- "the operator. Converted Rows with Action: " + str(converted_rows_with_action)
+ f"the operator. Converted Rows with Action: {converted_rows_with_action}"
)
raise AirflowException(message)
return total_data_count
diff --git a/airflow/providers/google/leveldb/operators/leveldb.py b/airflow/providers/google/leveldb/operators/leveldb.py
index 9471e9a131..2d544e89b4 100644
--- a/airflow/providers/google/leveldb/operators/leveldb.py
+++ b/airflow/providers/google/leveldb/operators/leveldb.py
@@ -88,7 +88,7 @@ class LevelDBOperator(BaseOperator):
keys=self.keys,
values=self.values,
)
- self.log.info("Done. Returned value was: %s", str(value))
+ self.log.info("Done. Returned value was: %s", value)
leveldb_hook.close_conn()
str_value = value if value is None else value.decode()
return str_value
diff --git a/airflow/providers/grpc/hooks/grpc.py b/airflow/providers/grpc/hooks/grpc.py
index 843b23bd60..a3ea7530cf 100644
--- a/airflow/providers/grpc/hooks/grpc.py
+++ b/airflow/providers/grpc/hooks/grpc.py
@@ -83,7 +83,7 @@ class GrpcHook(BaseHook):
base_url = self.conn.host
if self.conn.port:
- base_url = base_url + ":" + str(self.conn.port)
+ base_url += f":{self.conn.port}"
auth_type = self._get_field("auth_type")
diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py
index 6d58847922..1a8e5f7aeb 100644
--- a/airflow/providers/http/hooks/http.py
+++ b/airflow/providers/http/hooks/http.py
@@ -103,10 +103,10 @@ class HttpHook(BaseHook):
# schema defaults to HTTP
schema = conn.schema if conn.schema else "http"
host = conn.host if conn.host else ""
- self.base_url = schema + "://" + host
+ self.base_url = f"{schema}://{host}"
if conn.port:
- self.base_url = self.base_url + ":" + str(conn.port)
+ self.base_url += f":{conn.port}"
if conn.login:
session.auth = self.auth_type(conn.login, conn.password)
elif self._auth_type:
@@ -329,7 +329,7 @@ class HttpAsyncHook(BaseHook):
self.base_url = schema + "://" + host
if conn.port:
- self.base_url = self.base_url + ":" + str(conn.port)
+ self.base_url += f":{conn.port}"
if conn.login:
auth = self.auth_type(conn.login, conn.password)
if conn.extra:
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 25f864f6ff..941462c2da 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -147,7 +147,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
blob_names = self.hook.get_blobs_list(container_name=self.wasb_container, prefix=prefix)
except HttpResponseError as e:
messages.append(f"tried listing blobs with prefix={prefix} and container={self.wasb_container}")
- messages.append("could not list blobs " + str(e))
+ messages.append(f"could not list blobs {e}")
self.log.exception("can't list blobs")
if blob_names:
diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
index 4d35c6ab9c..d77a9d5fac 100644
--- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
+++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
@@ -125,7 +125,7 @@ class SFTPToWasbOperator(BaseOperator):
sftp_complete_path, prefix=prefix, delimiter=delimiter
)
- self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path)
+ self.log.info("Found %d files at sftp source path: %s", len(found_files), self.sftp_source_path)
for file in found_files:
future_blob_name = self.get_full_path_blob(file)
diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py
index 8e55aab81a..e9c333a347 100644
--- a/airflow/providers/oracle/hooks/oracle.py
+++ b/airflow/providers/oracle/hooks/oracle.py
@@ -193,9 +193,9 @@ class OracleHook(DbApiHook):
if dsn is None:
dsn = conn.host
if conn.port is not None:
- dsn += ":" + str(conn.port)
+ dsn += f":{conn.port}"
if service_name:
- dsn += "/" + service_name
+ dsn += f"/{service_name}"
elif conn.schema:
warnings.warn(
"""Using conn.schema to pass the Oracle Service Name is deprecated.
@@ -203,7 +203,7 @@ class OracleHook(DbApiHook):
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- dsn += "/" + conn.schema
+ dsn += f"/{conn.schema}"
conn_config["dsn"] = dsn
if "events" in conn.extra_dejson:
@@ -298,7 +298,7 @@ class OracleHook(DbApiHook):
elif cell is None or isinstance(cell, float) and math.isnan(cell): # coerce numpy NaN to NULL
lst.append("NULL")
elif np and isinstance(cell, np.datetime64):
- lst.append("'" + str(cell) + "'")
+ lst.append(f"'{cell}'")
elif isinstance(cell, datetime):
lst.append(f"to_date('{cell:%Y-%m-%d %H:%M:%S}','YYYY-MM-DD HH24:MI:SS')")
else:
diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py b/tests/providers/amazon/aws/operators/test_emr_serverless.py
index a93791e1a1..e1a1ca0f37 100644
--- a/tests/providers/amazon/aws/operators/test_emr_serverless.py
+++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py
@@ -664,7 +664,7 @@ class TestEmrServerlessStartJobOperator:
executionRoleArn=execution_role_arn,
jobDriver=job_driver,
configurationOverrides=configuration_overrides,
- name=f"emr_serverless_job_airflow_{str(UUID(generated_name_uuid, version=4))}",
+ name=f"emr_serverless_job_airflow_{UUID(generated_name_uuid, version=4)}",
)
@mock.patch.object(EmrServerlessHook, "get_waiter")
diff --git a/tests/providers/microsoft/azure/operators/test_asb.py b/tests/providers/microsoft/azure/operators/test_asb.py
index 9608a65c85..774d8a071d 100644
--- a/tests/providers/microsoft/azure/operators/test_asb.py
+++ b/tests/providers/microsoft/azure/operators/test_asb.py
@@ -40,7 +40,7 @@ from airflow.providers.microsoft.azure.operators.asb import (
QUEUE_NAME = "test_queue"
MESSAGE = "Test Message"
-MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(10)]
+MESSAGE_LIST = [f"MESSAGE {n}" for n in range(10)]
OWNER_NAME = "airflow"
DAG_ID = "test_azure_service_bus_subscription"
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py
index 7c78f1bd0b..6515fe6552 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -40,7 +40,7 @@ ENV_ID_ENVIRON_KEY: str = "SYSTEM_TESTS_ENV_ID"
ENV_ID_KEY: str = "ENV_ID"
DEFAULT_ENV_ID_PREFIX: str = "env"
DEFAULT_ENV_ID_LEN: int = 8
-DEFAULT_ENV_ID: str = f"{DEFAULT_ENV_ID_PREFIX}{str(uuid4())[:DEFAULT_ENV_ID_LEN]}"
+DEFAULT_ENV_ID: str = f"{DEFAULT_ENV_ID_PREFIX}{uuid4()!s:.{DEFAULT_ENV_ID_LEN}}"
PURGE_LOGS_INTERVAL_PERIOD = 5
# All test file names will contain this string.
diff --git a/tests/system/providers/docker/example_docker_copy_data.py b/tests/system/providers/docker/example_docker_copy_data.py
index 7a3e59f966..c7b9e2f0a5 100644
--- a/tests/system/providers/docker/example_docker_copy_data.py
+++ b/tests/system/providers/docker/example_docker_copy_data.py
@@ -79,8 +79,8 @@ with models.DAG(
"/bin/bash",
"-c",
"/bin/sleep 30; "
- "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};"
- "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';",
+ f"/bin/mv {{{{ params.source_location }}}}/{t_view.output} {{{{ params.target_location }}}};"
+ f"/bin/echo '{{{{ params.target_location }}}}/{t_view.output}';",
],
task_id="move_data",
do_xcom_push=True,