You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by hu...@apache.org on 2024/02/29 17:46:15 UTC
(airflow) branch main updated: Avoid non-recommended usage of logging (#37792)
This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 77341ef6a1 Avoid non-recommended usage of logging (#37792)
77341ef6a1 is described below
commit 77341ef6a1e4ffa3f8d3275eade325c89f2c95f2
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Thu Feb 29 21:46:08 2024 +0400
Avoid non-recommended usage of logging (#37792)
---
airflow/dag_processing/processor.py | 2 +-
airflow/jobs/backfill_job_runner.py | 6 ++----
airflow/models/variable.py | 8 ++++++--
airflow/providers/amazon/aws/operators/sagemaker.py | 10 +++++-----
.../providers/cncf/kubernetes/executors/kubernetes_executor.py | 3 +--
airflow/providers/google/cloud/operators/dataproc.py | 2 +-
airflow/utils/db_cleanup.py | 2 +-
airflow/utils/log/task_context_logger.py | 2 +-
pyproject.toml | 10 ++++++++--
9 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index ce4c51552b..99c9656826 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -177,7 +177,7 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
# gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This
# necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET.
with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
- StreamLogWriter(log, logging.WARN)
+ StreamLogWriter(log, logging.WARNING)
), Stats.timer() as timer:
_handle_dag_file_processing()
log.info("Processing %s took %.3f seconds", file_path, timer.duration)
diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py
index 7e57979ccd..6be72b2c95 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -681,11 +681,10 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
try:
session.commit()
except OperationalError:
- self.log.error(
+ self.log.exception(
"Failed to commit task state due to operational error. "
"The job will retry this operation so if your backfill succeeds, "
"you can safely ignore this message.",
- exc_info=True,
)
session.rollback()
if i == max_attempts - 1:
@@ -986,10 +985,9 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
# state to failed.
self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
except OperationalError:
- self.log.error(
+ self.log.exception(
"Backfill job dead-locked. The job will retry the job so it is likely "
"to heal itself. If your backfill succeeds you can ignore this exception.",
- exc_info=True,
)
raise
finally:
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index bfd70ae4c2..e79a44c20e 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -242,11 +242,15 @@ class Variable(Base, LoggingMixin):
try:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
+ _backend_name = type(secrets_backend).__name__
log.warning(
- "The variable {key} is defined in the {cls} secrets backend, which takes "
+ "The variable %s is defined in the %s secrets backend, which takes "
"precedence over reading from the database. The value in the database will be "
"updated, but to read it you have to delete the conflicting variable "
- "from {cls}".format(key=key, cls=secrets_backend.__class__.__name__)
+ "from %s",
+ key,
+ _backend_name,
+ _backend_name,
)
return
except Exception:
diff --git a/airflow/providers/amazon/aws/operators/sagemaker.py b/airflow/providers/amazon/aws/operators/sagemaker.py
index 3d6b1c2532..91b4200f18 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -349,13 +349,13 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
for processing_input in processing_inputs:
inputs.append(self.path_to_s3_dataset(processing_input["S3Input"]["S3Uri"]))
except KeyError:
- self.log.exception("Cannot find S3 input details", exc_info=True)
+ self.log.exception("Cannot find S3 input details")
try:
for processing_output in processing_outputs:
outputs.append(self.path_to_s3_dataset(processing_output["S3Output"]["S3Uri"]))
except KeyError:
- self.log.exception("Cannot find S3 output details.", exc_info=True)
+ self.log.exception("Cannot find S3 output details.")
return inputs, outputs
@@ -777,7 +777,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
try:
model_package_arn = self.serialized_model["PrimaryContainer"]["ModelPackageName"]
except KeyError:
- self.log.error("Cannot find Model Package Name.", exc_info=True)
+ self.log.exception("Cannot find Model Package Name.")
try:
transform_input = self.serialized_transform["TransformInput"]["DataSource"]["S3DataSource"][
@@ -785,7 +785,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
]
transform_output = self.serialized_transform["TransformOutput"]["S3OutputPath"]
except KeyError:
- self.log.error("Cannot find some required input/output details.", exc_info=True)
+ self.log.exception("Cannot find some required input/output details.")
inputs = []
@@ -813,7 +813,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
for container in model_containers:
model_data_urls.append(container["ModelDataUrl"])
except KeyError:
- self.log.exception("Cannot retrieve model details.", exc_info=True)
+ self.log.exception("Cannot retrieve model details.")
return model_data_urls
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 66fe9c2f50..c023bbf198 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -431,10 +431,9 @@ class KubernetesExecutor(BaseExecutor):
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
- self.log.error(
+ self.log.exception(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
- exc_info=True,
)
self.fail(task[0], e)
except ApiException as e:
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 8ea5b57864..6c273b92a8 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -835,7 +835,7 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator):
except AirflowException as ae_inner:
# We could get any number of failures here, including cluster not found and we
# can just ignore to ensure we surface the original cluster create failure
- self.log.error(ae_inner, exc_info=True)
+ self.log.exception(ae_inner)
finally:
raise ae
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 0b8c6c35e9..04dfe79c07 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -49,7 +49,7 @@ if TYPE_CHECKING:
from airflow.models import Base
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
ARCHIVE_TABLE_PREFIX = "_airflow_deleted__"
diff --git a/airflow/utils/log/task_context_logger.py b/airflow/utils/log/task_context_logger.py
index 46e8cf8cee..07a5ed74e8 100644
--- a/airflow/utils/log/task_context_logger.py
+++ b/airflow/utils/log/task_context_logger.py
@@ -143,7 +143,7 @@ class TaskContextLogger:
:param msg: the message to relay to task context log
:param ti: the task instance
"""
- self._log(logging.WARN, msg, *args, ti=ti)
+ self._log(logging.WARNING, msg, *args, ti=ti)
def warning(self, msg: str, *args, ti: TaskInstance):
"""
diff --git a/pyproject.toml b/pyproject.toml
index 4912b57c00..d8266b57c7 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1303,8 +1303,14 @@ namespace-packages = ["airflow/providers"]
[tool.ruff.lint]
typing-modules = ["airflow.typing_compat"]
extend-select = [
+ # Enable entire ruff rule section
"I", # Missing required import (auto-fixable)
"UP", # Pyupgrade
+ "ISC", # Checks for implicit literal string concatenation (auto-fixable)
+ "TCH", # Rules around TYPE_CHECKING blocks
+ "G", # flake8-logging-format rules
+ "LOG", # flake8-logging rules, most of them autofixable
+ # Per rule enables
"RUF100", # Unused noqa (auto-fixable)
# We ignore more pydocstyle than we enable, so be more selective at what we enable
"D101",
@@ -1321,10 +1327,8 @@ extend-select = [
"D403",
"D412",
"D419",
- "TCH", # Rules around TYPE_CHECKING blocks
"TID251", # Specific modules or module members that may not be imported or accessed
"TID253", # Ban certain modules from being imported at module level
- "ISC", # Checks for implicit literal string concatenation (auto-fixable)
"B006", # Checks for uses of mutable objects as function argument defaults.
"PT001",
"PT003",
@@ -1337,6 +1341,8 @@ extend-select = [
"PT027",
]
ignore = [
+ "G003", # Logging statement uses + (not fixed yet)
+ "G004", # Logging statement uses f-string (not fixed yet)
"D203",
"D212",
"D213",