You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 22:32:57 UTC
[airflow] branch v1-10-test updated: [AIRFLOW-6708] Set unique
logger names (#7330)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 722b465 [AIRFLOW-6708] Set unique logger names (#7330)
722b465 is described below
commit 722b46523f7715aef79640aa1626ff705454895f
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Sun Feb 2 20:48:03 2020 +0100
[AIRFLOW-6708] Set unique logger names (#7330)
(cherry picked from commit cf141506a25dbba279b85500d781f7e056540721)
---
.pre-commit-config.yaml | 6 ++++
airflow/__init__.py | 3 +-
airflow/api/__init__.py | 9 +++---
airflow/api/auth/backend/kerberos_auth.py | 14 ++++-----
airflow/api/common/experimental/delete_dag.py | 7 +++--
airflow/bin/cli.py | 2 +-
airflow/config_templates/default_celery.py | 4 +--
airflow/configuration.py | 4 +--
.../auth/backends/github_enterprise_auth.py | 4 +--
airflow/contrib/auth/backends/google_auth.py | 4 +--
airflow/contrib/auth/backends/ldap_auth.py | 18 +++++------
airflow/contrib/auth/backends/password_auth.py | 8 ++---
.../example_dags/example_kubernetes_operator.py | 12 ++++----
airflow/contrib/hooks/bigquery_hook.py | 7 +++--
airflow/contrib/hooks/cloudant_hook.py | 5 ++--
airflow/contrib/hooks/gcp_api_base_hook.py | 8 ++---
airflow/contrib/hooks/gcp_mlengine_hook.py | 6 ++--
airflow/contrib/hooks/qubole_check_hook.py | 8 +++--
airflow/contrib/hooks/qubole_hook.py | 6 ++--
airflow/contrib/hooks/salesforce_hook.py | 11 +++----
airflow/contrib/operators/mlengine_operator.py | 5 ++--
airflow/contrib/utils/sendgrid.py | 9 +++---
airflow/executors/__init__.py | 5 ++--
airflow/executors/celery_executor.py | 12 ++++----
airflow/executors/kubernetes_executor.py | 2 +-
airflow/hooks/base_hook.py | 4 ++-
airflow/hooks/webhdfs_hook.py | 6 ++--
airflow/lineage/__init__.py | 8 +++--
airflow/models/crypto.py | 6 ++--
airflow/models/dag.py | 5 ++--
airflow/models/serialized_dag.py | 5 ++--
airflow/models/variable.py | 9 +++---
airflow/models/xcom.py | 4 +--
airflow/security/kerberos.py | 5 ++--
airflow/sensors/hdfs_sensor.py | 8 ++---
airflow/sentry.py | 7 ++---
airflow/serialization/serialized_objects.py | 10 +++----
airflow/utils/dag_processing.py | 3 +-
airflow/utils/db.py | 4 +--
airflow/utils/email.py | 5 ++--
airflow/utils/log/es_task_handler.py | 2 +-
airflow/utils/log/logging_mixin.py | 2 +-
airflow/utils/sqlalchemy.py | 4 +--
airflow/www/api/experimental/endpoints.py | 35 +++++++++++-----------
airflow/www/app.py | 5 ++--
airflow/www/views.py | 9 +++---
airflow/www_rbac/api/experimental/endpoints.py | 32 ++++++++++----------
.../test_gcp_sql_operator_system_helper.py | 11 +++----
tests/contrib/utils/gcp_authenticator.py | 16 +++++-----
tests/contrib/utils/logging_command_executor.py | 20 ++++++-------
tests/security/test_kerberos.py | 4 +--
51 files changed, 215 insertions(+), 193 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index ed27b09..e7b90db 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -286,6 +286,12 @@ repos:
language: system
pass_filenames: false
files: ^common/_common_values.sh$|^breeze-complete$
+ - id: incorrect-use-of-LoggingMixin
+ language: pygrep
+ name: Make sure LoggingMixin is not used alone
+ entry: "LoggingMixin\\(\\)"
+ files: \.py$
+ pass_filenames: true
- id: build
name: Check if image build is needed
entry: ./scripts/ci/pre_commit_ci_build.sh 3.5 false
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 3ac543f..c8dcd21 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -32,6 +32,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
__version__ = version.version
+import logging
import sys
# flake8: noqa: F401
@@ -47,10 +48,10 @@ from airflow.exceptions import AirflowException
settings.initialize()
login = None # type: Any
+log = logging.getLogger(__name__)
def load_login():
- log = LoggingMixin().log
auth_backend = 'airflow.default_login'
try:
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 2feb7f4..3750752 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -21,15 +21,16 @@
from __future__ import print_function
from importlib import import_module
+import logging
import warnings
import lazy_object_proxy
from zope.deprecation import deprecated
-from airflow.exceptions import AirflowException, AirflowConfigException
from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException, AirflowException
-from airflow.utils.log.logging_mixin import LoggingMixin
+log = logging.getLogger(__name__)
class ApiAuth: # pylint: disable=too-few-public-methods
@@ -40,8 +41,6 @@ class ApiAuth: # pylint: disable=too-few-public-methods
API_AUTH = ApiAuth()
-LOG = LoggingMixin().log
-
def load_auth():
"""Loads authentication backend"""
@@ -67,7 +66,7 @@ def load_auth():
api_auth.client_auth = deprecated('use CLIENT_AUTH', api_auth.CLIENT_AUTH)
API_AUTH.api_auth = api_auth
except ImportError as err:
- LOG.critical(
+ log.critical(
"Cannot import %s for API authentication due to: %s",
auth_backend, err
)
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index 3e340f8..26f3309 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -44,6 +44,7 @@
from future.standard_library import install_aliases
+import logging
import os
from functools import wraps
@@ -61,15 +62,14 @@ import kerberos
from requests_kerberos import HTTPKerberosAuth
from airflow.configuration import conf
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
install_aliases()
# pylint: disable=c-extension-no-member
CLIENT_AUTH = HTTPKerberosAuth(service='airflow')
-LOG = LoggingMixin().log
-
class KerberosService: # pylint: disable=too-few-public-methods
"""Class to keep information about the Kerberos Service initialized """
@@ -87,7 +87,7 @@ def init_app(app):
hostname = app.config.get('SERVER_NAME')
if not hostname:
hostname = getfqdn()
- LOG.info("Kerberos: hostname %s", hostname)
+ log.info("Kerberos: hostname %s", hostname)
service = 'airflow'
@@ -97,12 +97,12 @@ def init_app(app):
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
try:
- LOG.info("Kerberos init: %s %s", service, hostname)
+ log.info("Kerberos init: %s %s", service, hostname)
principal = kerberos.getServerPrincipalDetails(service, hostname)
except kerberos.KrbError as err:
- LOG.warning("Kerberos: %s", err)
+ log.warning("Kerberos: %s", err)
else:
- LOG.info("Kerberos API: server is %s", principal)
+ log.info("Kerberos API: server is %s", principal)
def _unauthorized():
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index db48d35..c6c10ff 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
"""Delete DAGs APIs."""
+import logging
from sqlalchemy import or_
@@ -26,7 +27,8 @@ from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.db import provide_session
from airflow.exceptions import DagNotFound
from airflow.settings import STORE_SERIALIZED_DAGS
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
@provide_session
@@ -39,8 +41,7 @@ def delete_dag(dag_id, keep_records_in_log=True, session=None):
:param session: session used
:return count of deleted dags
"""
- logger = LoggingMixin()
- logger.log.info("Deleting DAG: %s", dag_id)
+ log.info("Deleting DAG: %s", dag_id)
dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if dag is None:
raise DagNotFound("Dag id {} not found".format(dag_id))
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 1ed8266..8334af6 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -95,7 +95,7 @@ api_module = import_module(conf.get('cli', 'api_client')) # type: Any
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.API_AUTH.api_auth.CLIENT_AUTH)
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
DAGS_FOLDER = settings.DAGS_FOLDER
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 026053f..00009da 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -17,18 +17,18 @@
# specific language governing permissions and limitations
# under the License.
"""Default celery configuration."""
+import logging
import ssl
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
-from airflow.utils.log.logging_mixin import LoggingMixin
def _broker_supports_visibility_timeout(url):
return url.startswith("redis://") or url.startswith("sqs://")
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
broker_url = conf.get('celery', 'BROKER_URL')
diff --git a/airflow/configuration.py b/airflow/configuration.py
index fe45a01..d912898 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,6 +28,7 @@ import copy
import errno
from future import standard_library
import multiprocessing
+import logging
import os
import shlex
import six
@@ -41,11 +42,10 @@ import yaml
from zope.deprecation import deprecated
from airflow.exceptions import AirflowConfigException
-from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
# show Airflow's deprecation warnings
if not sys.warnoptions:
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 2881922..0cfd617 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import logging
import flask_login
# Need to expose these downstream
@@ -29,9 +30,8 @@ from flask_oauthlib.client import OAuth
from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
-from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
def get_config_param(param):
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index 2ce23ae..cf14677 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import logging
import flask_login
# Need to expose these downstream
@@ -29,9 +30,8 @@ from flask_oauthlib.client import OAuth
from airflow import models
from airflow.configuration import conf
from airflow.utils.db import provide_session
-from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
def get_config_param(param):
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 7368dea..c034105 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -17,33 +17,29 @@
# specific language governing permissions and limitations
# under the License.
from future.utils import native
+import logging
+import re
+import ssl
+import traceback
import flask_login
from flask_login import login_required, current_user, logout_user # noqa: F401
-from flask import flash
+from flask import flash, redirect, url_for
from wtforms import Form, PasswordField, StringField
from wtforms.validators import InputRequired
from ldap3 import Server, Connection, Tls, set_config_parameter, LEVEL, SUBTREE
-import ssl
-
-from flask import url_for, redirect
from airflow import models
-from airflow.configuration import conf
-from airflow.configuration import AirflowConfigException
+from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
-import traceback
-import re
-
-from airflow.utils.log.logging_mixin import LoggingMixin
LOGIN_MANAGER = flask_login.LoginManager()
LOGIN_MANAGER.login_view = 'airflow.login' # Calls login() below
LOGIN_MANAGER.login_message = None
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
class AuthenticationError(Exception):
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index e04a40d..7b2b9ef 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -20,6 +20,7 @@
from __future__ import unicode_literals
import base64
+import logging
from functools import wraps
from sys import version_info
@@ -41,14 +42,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
from airflow import models
from airflow.utils.db import provide_session, create_session
-from airflow.utils.log.logging_mixin import LoggingMixin
LOGIN_MANAGER = flask_login.LoginManager()
LOGIN_MANAGER.login_view = 'airflow.login' # Calls login() below
LOGIN_MANAGER.login_message = None
-LOG = LoggingMixin().log
PY3 = version_info[0] == 3
+log = logging.getLogger(__name__)
CLIENT_AUTH = None
@@ -119,7 +119,7 @@ class PasswordUser(models.User):
@provide_session
def load_user(userid, session=None):
"""Loads user from the database"""
- LOG.debug("Loading user %s", userid)
+ log.debug("Loading user %s", userid)
if not userid or userid == 'None':
return None
@@ -151,7 +151,7 @@ def authenticate(session, username, password):
if not user.authenticate(password):
raise AuthenticationError()
- LOG.info("User %s successfully authenticated", username)
+ log.info("User %s successfully authenticated", username)
return user
diff --git a/airflow/contrib/example_dags/example_kubernetes_operator.py b/airflow/contrib/example_dags/example_kubernetes_operator.py
index e945335..645b4b6 100644
--- a/airflow/contrib/example_dags/example_kubernetes_operator.py
+++ b/airflow/contrib/example_dags/example_kubernetes_operator.py
@@ -19,11 +19,13 @@
"""
This is an example dag for using the KubernetesPodOperator.
"""
-from airflow.utils.dates import days_ago
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+import logging
+
from airflow.models import DAG
+from airflow.utils.dates import days_ago
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
try:
# Kubernetes is optional, so not available in vanilla Airflow
@@ -64,6 +66,6 @@ try:
)
except ImportError as e:
- log.warning("Could not import KubernetesPodOperator: " + str(e))
- log.warning("Install kubernetes dependencies with: "
+ log.warning("Could not import KubernetesPodOperator: %s, ", str(e))
+ log.warning("Install kubernetes dependencies with: \n"
" pip install 'apache-airflow[kubernetes]'")
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 4948ca4..930d212 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -21,9 +21,9 @@
This module contains a BigQuery Hook, as well as a very basic PEP 249
implementation for BigQuery.
"""
-
-import time
+import logging
import six
+import time
from builtins import range
from copy import deepcopy
from six import iteritems
@@ -43,6 +43,8 @@ from pandas_gbq.gbq import \
_test_google_api_imports as gbq_test_google_api_imports
from pandas_gbq.gbq import GbqConnector
+log = logging.getLogger(__name__)
+
class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
"""
@@ -2233,7 +2235,6 @@ def _split_tablename(table_input, default_project_id, var_name=None):
if project_id is None:
if var_name is not None:
- log = LoggingMixin().log
log.info(
'Project not included in %s: %s; using project "%s"',
var_name, table_input, default_project_id
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index 5d39f3f..b160ae4 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import logging
from past.builtins import unicode
@@ -23,7 +24,8 @@ import cloudant
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
class CloudantHook(BaseHook):
@@ -40,7 +42,6 @@ class CloudantHook(BaseHook):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
- log = LoggingMixin().log
log.debug(
'cloudant-python does not support unicode. Encoding %s as '
'ascii using "ignore".', s
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 3ba68e2..92899c3 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -35,11 +35,11 @@ from googleapiclient.errors import HttpError
import tenacity
from googleapiclient.http import set_user_agent
-from airflow import LoggingMixin, version
+from airflow import version
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-logger = LoggingMixin().log
+log = logging.getLogger(__name__)
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
@@ -234,8 +234,8 @@ class GoogleCloudBaseHook(BaseHook):
default_kwargs = {
'wait': tenacity.wait_exponential(multiplier=1, max=100),
'retry': retry_if_temporary_quota(),
- 'before': tenacity.before_log(logger, logging.DEBUG),
- 'after': tenacity.after_log(logger, logging.DEBUG),
+ 'before': tenacity.before_log(log, logging.DEBUG),
+ 'after': tenacity.after_log(log, logging.DEBUG),
}
default_kwargs.update(**kwargs)
return tenacity.retry(
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 2ca0bf5..edf51e0 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -14,18 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import logging
import random
import time
from googleapiclient.errors import HttpError
from googleapiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
- log = LoggingMixin().log
for i in range(0, max_n):
try:
diff --git a/airflow/contrib/hooks/qubole_check_hook.py b/airflow/contrib/hooks/qubole_check_hook.py
index 303c19b..137bab7 100644
--- a/airflow/contrib/hooks/qubole_check_hook.py
+++ b/airflow/contrib/hooks/qubole_check_hook.py
@@ -17,7 +17,8 @@
# specific language governing permissions and limitations
# under the License.
#
-from airflow.utils.log.logging_mixin import LoggingMixin
+import logging
+
from airflow.contrib.hooks.qubole_hook import QuboleHook
from airflow.exceptions import AirflowException
from qds_sdk.commands import Command
@@ -32,6 +33,9 @@ COL_DELIM = '\t'
ROW_DELIM = '\r\n'
+log = logging.getLogger(__name__)
+
+
def isint(value):
try:
int(value)
@@ -92,7 +96,6 @@ class QuboleCheckHook(QuboleHook):
cmd = Command.find(cmd_id)
if cmd is not None:
if cmd.status == 'running':
- log = LoggingMixin().log
log.info('Cancelling the Qubole Command Id: %s', cmd_id)
cmd.cancel()
@@ -104,7 +107,6 @@ class QuboleCheckHook(QuboleHook):
return record_list
def get_query_results(self):
- log = LoggingMixin().log
if self.cmd is not None:
cmd_id = self.cmd.id
log.info("command id: " + str(cmd_id))
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 37433a2..9facd2c 100644
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -18,6 +18,7 @@
# under the License.
#
"""Qubole hook"""
+import logging
import os
import time
import datetime
@@ -31,9 +32,11 @@ from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand,
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.configuration import conf, mkdir_p
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
+log = logging.getLogger(__name__)
+
+
COMMAND_CLASSES = {
"hivecmd": HiveCommand,
"prestocmd": PrestoCommand,
@@ -117,7 +120,6 @@ class QuboleHook(BaseHook):
if cmd_id is not None:
cmd = Command.find(cmd_id)
if cmd is not None:
- log = LoggingMixin().log
if cmd.status == 'done':
log.info('Command ID: %s has been succeeded, hence marking this '
'TI as Success.', cmd_id)
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index a1756b6..0759418 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -26,15 +26,17 @@ for other uses.
NOTE: this hook also relies on the simple_salesforce package:
https://github.com/simple-salesforce/simple-salesforce
"""
+import json
+import logging
+import time
+
from simple_salesforce import Salesforce
from airflow.hooks.base_hook import BaseHook
-import json
-
import pandas as pd
-import time
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
class SalesforceHook(BaseHook):
@@ -176,7 +178,6 @@ class SalesforceHook(BaseHook):
try:
col = pd.to_datetime(col)
except ValueError:
- log = LoggingMixin().log
log.warning(
"Could not convert field to timestamps: %s", col.name
)
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index d3f36ef..687cff3 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import logging
import re
from googleapiclient.errors import HttpError
@@ -24,9 +24,8 @@ from airflow.contrib.hooks.gcp_mlengine_hook import MLEngineHook
from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
-from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
def _normalize_mlengine_job_id(job_id):
diff --git a/airflow/contrib/utils/sendgrid.py b/airflow/contrib/utils/sendgrid.py
index a185467..be0e249 100644
--- a/airflow/contrib/utils/sendgrid.py
+++ b/airflow/contrib/utils/sendgrid.py
@@ -23,6 +23,7 @@ from __future__ import print_function
from __future__ import unicode_literals
import base64
+import logging
import mimetypes
import os
@@ -31,7 +32,8 @@ from sendgrid.helpers.mail import Attachment, Content, Email, Mail, \
Personalization, CustomArg, Category
from airflow.utils.email import get_email_address_list
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
def send_email(to, subject, html_content, files=None,
@@ -104,9 +106,8 @@ def send_email(to, subject, html_content, files=None,
def _post_sendgrid_mail(mail_data):
- log = LoggingMixin().log
- sg = sendgrid.SendGridAPIClient(apikey=os.environ.get('SENDGRID_API_KEY'))
- response = sg.client.mail.send.post(request_body=mail_data)
+ sendgrid_client = sendgrid.SendGridAPIClient(api_key=os.environ.get('SENDGRID_API_KEY'))
+ response = sendgrid_client.client.mail.send.post(request_body=mail_data)
# 2xx status code.
if response.status_code >= 200 and response.status_code < 300:
log.info('Email with subject %s is successfully sent to recipients: %s' %
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index b1b9951..948bf33 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -16,9 +16,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import logging
import sys
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor # noqa
@@ -26,6 +25,7 @@ from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
DEFAULT_EXECUTOR = None
+log = logging.getLogger(__name__)
def _integrate_plugins():
@@ -47,7 +47,6 @@ def get_default_executor():
DEFAULT_EXECUTOR = _get_executor(executor_name)
- log = LoggingMixin().log
log.info("Using executor %s", executor_name)
return DEFAULT_EXECUTOR
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index a8775c4..2768030 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -16,7 +16,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+"""Celery executor."""
+import logging
import math
import os
import subprocess
@@ -31,10 +32,11 @@ from airflow.configuration import conf
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
from airflow.utils.timeout import timeout
+log = logging.getLogger(__name__)
+
# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state'
@@ -61,7 +63,7 @@ app = Celery(
@app.task
def execute_command(command_to_exec):
- log = LoggingMixin().log
+ """Executes command."""
if command_to_exec[0:2] != ["airflow", "run"]:
raise ValueError('The command must start with ["airflow", "run"].')
log.info("Executing command in Celery: %s", command_to_exec)
@@ -224,7 +226,7 @@ class CeleryExecutor(BaseExecutor):
for key, command, result in key_and_async_results:
if isinstance(result, ExceptionWithTraceback):
- self.log.error(
+ self.log.error( # pylint: disable=logging-not-lazy
CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
)
elif result is not None:
@@ -263,7 +265,7 @@ class CeleryExecutor(BaseExecutor):
for key_and_state in task_keys_to_states:
if isinstance(key_and_state, ExceptionWithTraceback):
- self.log.error(
+ self.log.error( # pylint: disable=logging-not-lazy
CELERY_FETCH_ERR_MSG_HEADER + ", ignoring it:%s\n%s\n",
repr(key_and_state.exception), key_and_state.traceback
)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index e014aa3..8f87c08 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -791,7 +791,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
self.kube_scheduler.run_next(task)
except ApiException as e:
self.log.warning('ApiException when attempting to run task, re-queueing. '
- 'Message: %s' % json.loads(e.body)['message'])
+ 'Message: %s', json.loads(e.body)['message'])
self.task_queue.put(task)
except HTTPError as e:
self.log.warning('HTTPError when attempting to run task, re-queueing. '
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index 0d7e39f..16a18a8 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -23,6 +23,7 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
+import logging
import os
import random
from typing import List
@@ -35,6 +36,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
+log = logging.getLogger(__name__)
+
class BaseHook(LoggingMixin):
"""
@@ -83,7 +86,6 @@ class BaseHook(LoggingMixin):
def get_connection(cls, conn_id): # type: (str) -> Connection
conn = random.choice(list(cls.get_connections(conn_id)))
if conn.host:
- log = LoggingMixin().log
log.info("Using connection to: %s", conn.log_info())
return conn
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index 6d260ee..262f61a 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -16,20 +16,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import logging
from hdfs import InsecureClient, HdfsError
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+log = logging.getLogger(__name__)
_kerberos_security_mode = conf.get("core", "security") == "kerberos"
if _kerberos_security_mode:
try:
from hdfs.ext.kerberos import KerberosClient
except ImportError:
- log = LoggingMixin().log
log.error("Could not load the Kerberos extension for the WebHDFSHook.")
raise
diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index f444139..370e9c2 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -16,11 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+Provides lineage support functions
+"""
+import logging
+
from functools import wraps
from airflow.configuration import conf
from airflow.lineage.datasets import DataSet
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
from itertools import chain
@@ -28,7 +32,7 @@ from itertools import chain
PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
def _get_backend():
diff --git a/airflow/models/crypto.py b/airflow/models/crypto.py
index abc7d2d..79e6ab3 100644
--- a/airflow/models/crypto.py
+++ b/airflow/models/crypto.py
@@ -19,9 +19,12 @@
from builtins import ImportError as BuiltinImportError
+import logging
+
from airflow.configuration import conf
from airflow.exceptions import AirflowException
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
class InvalidFernetToken(Exception):
@@ -62,7 +65,6 @@ def get_fernet():
:raises: airflow.exceptions.AirflowException if there's a problem trying to load Fernet
"""
global _fernet
- log = LoggingMixin().log
if _fernet:
return _fernet
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 933dc10..54bc87e 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -21,6 +21,7 @@ from __future__ import print_function
import copy
import functools
+import logging
import os
import pickle
import re
@@ -68,6 +69,8 @@ if TYPE_CHECKING:
install_aliases()
+log = logging.getLogger(__name__)
+
ScheduleInterval = Union[str, timedelta, relativedelta]
@@ -1601,7 +1604,6 @@ class DAG(BaseDag, LoggingMixin):
:type expiration_date: datetime
:return: None
"""
- log = LoggingMixin().log
for dag in session.query(
DagModel).filter(DagModel.last_scheduler_run < expiration_date,
DagModel.is_active).all():
@@ -1899,7 +1901,6 @@ class DagModel(Base):
:param alive_dag_filelocs: file paths of alive DAGs
:param session: ORM Session
"""
- log = LoggingMixin().log
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ",
cls.__tablename__)
dag_models = session.query(cls).all()
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 9709dba..c1a647a 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -19,6 +19,7 @@
"""Serialzed DAG table in database."""
+import logging
from datetime import timedelta
from typing import Any, Optional
@@ -32,11 +33,9 @@ from airflow.models.dagcode import DagCode
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.settings import json
from airflow.utils import db, timezone
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime
-
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
class SerializedDagModel(Base):
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 1e8fc78..49fc2b4 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -46,18 +46,17 @@ class Variable(Base, LoggingMixin):
return '{} : {}'.format(self.key, self._val)
def get_val(self):
- log = LoggingMixin().log
if self._val is not None and self.is_encrypted:
try:
fernet = get_fernet()
return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
except InvalidFernetToken:
- log.error("Can't decrypt _val for key={}, invalid token "
- "or value".format(self.key))
+ self.log.error("Can't decrypt _val for key={}, invalid token "
+ "or value".format(self.key))
return None
except Exception:
- log.error("Can't decrypt _val for key={}, FERNET_KEY "
- "configuration missing".format(self.key))
+ self.log.error("Can't decrypt _val for key={}, FERNET_KEY "
+ "configuration missing".format(self.key))
return None
else:
return self._val
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 434cb71..f4522b5 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -18,6 +18,7 @@
# under the License.
import json
+import logging
import pickle
from sqlalchemy import Column, Integer, String, Index, LargeBinary, and_
@@ -31,6 +32,7 @@ from airflow.utils.helpers import as_tuple
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime
+log = logging.getLogger(__name__)
# MAX XCOM Size is 48KB
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
@@ -162,7 +164,6 @@ class XCom(Base, LoggingMixin):
try:
return json.loads(result.value.decode('UTF-8'))
except ValueError:
- log = LoggingMixin().log
log.error("Could not deserialize the XCOM value from JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -226,7 +227,6 @@ class XCom(Base, LoggingMixin):
try:
return json.dumps(value).encode('UTF-8')
except ValueError:
- log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 6c04081..dfde112 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -32,18 +32,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Kerberos security provider"""
-
+import logging
import socket
import subprocess
import sys
import time
-from airflow import LoggingMixin
from airflow.configuration import conf
NEED_KRB181_WORKAROUND = None
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
def renew_from_kt(principal, keytab):
diff --git a/airflow/sensors/hdfs_sensor.py b/airflow/sensors/hdfs_sensor.py
index bc3a3c6..f187184 100644
--- a/airflow/sensors/hdfs_sensor.py
+++ b/airflow/sensors/hdfs_sensor.py
@@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import logging
import re
import sys
from builtins import str
@@ -25,7 +25,9 @@ from airflow import settings
from airflow.hooks.hdfs_hook import HDFSHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+log = logging.getLogger(__name__)
class HdfsSensor(BaseSensorOperator):
@@ -65,7 +67,6 @@ class HdfsSensor(BaseSensorOperator):
:return: (bool) depending on the matching criteria
"""
if size:
- log = LoggingMixin().log
log.debug(
'Filtering for file size >= %s in files: %s',
size, map(lambda x: x['path'], result)
@@ -90,7 +91,6 @@ class HdfsSensor(BaseSensorOperator):
:rtype: list[dict]
"""
if ignore_copying:
- log = LoggingMixin().log
regex_builder = r"^.*\.(%s$)$" % '$|'.join(ignored_ext)
ignored_extensions_regex = re.compile(regex_builder)
log.debug(
diff --git a/airflow/sentry.py b/airflow/sentry.py
index 73dddc7..ff85969 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -18,17 +18,14 @@
# under the License.
"""Sentry Integration"""
-
-
+import logging
from functools import wraps
from airflow.configuration import conf
from airflow.utils.db import provide_session
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
-
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
class DummySentry:
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 8d261aa..34372db 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -29,7 +29,7 @@ import cattr
import pendulum
from dateutil import relativedelta
-from airflow import DAG, AirflowException, LoggingMixin
+from airflow import DAG, AirflowException
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.models.connection import Connection
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
@@ -46,6 +46,8 @@ except ImportError:
if TYPE_CHECKING:
from inspect import Parameter
+log = logging.getLogger(__name__)
+
class BaseSerialization:
"""BaseSerialization provides utils for serialization."""
@@ -212,10 +214,10 @@ class BaseSerialization:
return cls._encode(
[cls._serialize(v) for v in var], type_=DAT.TUPLE)
else:
- LOG.debug('Cast type %s to str in serialization.', type(var))
+ log.debug('Cast type %s to str in serialization.', type(var))
return str(var)
except Exception: # pylint: disable=broad-except
- LOG.warning('Failed to stringify.', exc_info=True)
+ log.warning('Failed to stringify.', exc_info=True)
return FAILED
@classmethod
@@ -619,7 +621,5 @@ class SerializedDAG(DAG, BaseSerialization):
return cls.deserialize_dag(serialized_obj['dag'])
-LOG = LoggingMixin().log
-
# Serialization failure returns 'failed'.
FAILED = 'serialization_failed'
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 3a390e7..4a4b240 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -61,6 +61,8 @@ from airflow.utils.state import State
if six.PY2:
ConnectionError = IOError
+log = logging.getLogger(__name__)
+
class SimpleDag(BaseDag):
"""
@@ -367,7 +369,6 @@ def list_py_file_paths(directory, safe_mode=conf.getboolean('core', 'DAG_DISCOVE
file_paths.append(file_path)
except Exception:
- log = LoggingMixin().log
log.exception("Error while examining %s", f)
if include_examples:
import airflow.example_dags
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index d18c7e8..937a1bf 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -24,14 +24,14 @@ from __future__ import unicode_literals
from functools import wraps
+import logging
import os
import contextlib
from airflow import settings
from airflow.configuration import conf
-from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
@contextlib.contextmanager
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index ee764f7..42f0056 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -29,6 +29,7 @@ except ImportError:
from collections import Iterable as CollectionIterable
import importlib
+import logging
import os
import smtplib
@@ -40,7 +41,8 @@ from typing import Iterable, List, Union
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
-from airflow.utils.log.logging_mixin import LoggingMixin
+
+log = logging.getLogger(__name__)
def send_email(to, subject, html_content,
@@ -107,7 +109,6 @@ def send_email_smtp(to, subject, html_content, files=None,
def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
- log = LoggingMixin().log
SMTP_HOST = conf.get('smtp', 'SMTP_HOST')
SMTP_PORT = conf.getint('smtp', 'SMTP_PORT')
diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py
index ef62ec7..de85f7c 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -181,7 +181,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
try:
metadata['max_offset'] = s[max_log_line - 1].execute()[-1].offset if max_log_line > 0 else 0
except Exception:
- self.log.exception('Could not get current log size with log_id: {}'.format(log_id))
+ self.log.exception('Could not get current log size with log_id: %s', log_id)
logs = []
if max_log_line != 0:
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index 3e5991e..90131d1 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -70,7 +70,7 @@ class LoggingMixin(object):
try:
return self._log
except AttributeError:
- self._log = logging.root.getChild(
+ self._log = logging.getLogger(
self.__class__.__module__ + '.' + self.__class__.__name__
)
return self._log
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 1939440..234ce43 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -23,6 +23,7 @@ from __future__ import print_function
from __future__ import unicode_literals
import datetime
+import logging
import os
import json
import pendulum
@@ -32,9 +33,8 @@ from sqlalchemy import event, exc
from sqlalchemy.types import Text, DateTime, TypeDecorator
from airflow.configuration import conf
-from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
utc = pendulum.timezone('UTC')
using_mysql = conf.get('core', 'sql_alchemy_conn').lower().startswith('mysql')
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 6a23089..91d060d 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -20,6 +20,8 @@ from flask import (
g, Blueprint, jsonify, request, url_for
)
+import logging
+
import airflow.api
from airflow.api.common.experimental import delete_dag as delete
from airflow.api.common.experimental import pool as pool_api
@@ -31,12 +33,11 @@ from airflow.api.common.experimental.get_code import get_code
from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
from airflow.exceptions import AirflowException
from airflow.utils import timezone
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.strings import to_boolean
from airflow.www.app import csrf
from airflow import models
-_log = LoggingMixin().log
+log = logging.getLogger(__name__)
requires_authentication = airflow.api.API_AUTH.api_auth.requires_authentication
@@ -73,7 +74,7 @@ def trigger_dag(dag_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
- _log.info(error_message)
+ log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@@ -86,13 +87,13 @@ def trigger_dag(dag_id):
try:
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
if getattr(g, 'user', None):
- _log.info("User %s created %s", g.user, dr)
+ log.info("User %s created %s", g.user, dr)
response = jsonify(
message="Created {}".format(dr),
@@ -112,7 +113,7 @@ def delete_dag(dag_id):
try:
count = delete.delete_dag(dag_id)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -134,7 +135,7 @@ def dag_runs(dag_id):
state = request.args.get('state')
dagruns = get_dag_runs(dag_id, state, run_url_route='airflow.graph')
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 400
return response
@@ -155,7 +156,7 @@ def get_dag_code(dag_id):
try:
return get_code(dag_id)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -168,7 +169,7 @@ def task_info(dag_id, task_id):
try:
info = get_task(dag_id, task_id)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -225,7 +226,7 @@ def task_instance_info(dag_id, execution_date, task_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
- _log.info(error_message)
+ log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@@ -234,7 +235,7 @@ def task_instance_info(dag_id, execution_date, task_id):
try:
info = get_task_instance(dag_id, task_id, execution_date)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -266,7 +267,7 @@ def dag_run_status(dag_id, execution_date):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
- _log.info(error_message)
+ log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@@ -275,7 +276,7 @@ def dag_run_status(dag_id, execution_date):
try:
info = get_dag_run_state(dag_id, execution_date)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -310,7 +311,7 @@ def get_pool(name):
try:
pool = pool_api.get_pool(name=name)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -325,7 +326,7 @@ def get_pools():
try:
pools = pool_api.get_pools()
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -342,7 +343,7 @@ def create_pool():
try:
pool = pool_api.create_pool(**params)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -358,7 +359,7 @@ def delete_pool(name):
try:
pool = pool_api.delete_pool(name=name)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 30b9f75..b101f45 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
#
+import logging
from typing import Any
import six
@@ -29,7 +30,7 @@ from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.middleware.dispatcher import DispatcherMiddleware
import airflow
-from airflow import models, version, LoggingMixin
+from airflow import models, version
from airflow.configuration import conf
from airflow.models.connection import Connection
from airflow.settings import Session, STATE_COLORS
@@ -41,6 +42,7 @@ from airflow import settings
from airflow.utils.net import get_hostname
csrf = CSRFProtect()
+log = logging.getLogger(__name__)
def create_app(config=None, testing=False):
@@ -152,7 +154,6 @@ def create_app(config=None, testing=False):
def integrate_plugins():
"""Integrate plugins to the context"""
- log = LoggingMixin().log
from airflow.plugins_manager import (
admin_views, flask_blueprints, menu_links)
for v in admin_views:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 99b747b..8b1e910 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -65,11 +65,9 @@ from wtforms import (
import nvd3
import airflow
-from airflow import LoggingMixin, configuration
+from airflow import configuration
from airflow.configuration import conf
-from airflow import models
-from airflow import settings
-from airflow import jobs
+from airflow import jobs, models, settings
from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_running,
set_dag_run_state_to_success,
set_dag_run_state_to_failed)
@@ -106,6 +104,8 @@ FILTER_BY_OWNER = False
PAGE_SIZE = conf.getint('webserver', 'page_size')
+log = logging.getLogger(__name__)
+
if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
# filter_by_owner if authentication is enabled and filter_by_owner is true
FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
@@ -2790,7 +2790,6 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView):
try:
model.value = json.dumps(model.value).encode('UTF-8')
except ValueError:
- log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
diff --git a/airflow/www_rbac/api/experimental/endpoints.py b/airflow/www_rbac/api/experimental/endpoints.py
index 928e4ac..3d37e76 100644
--- a/airflow/www_rbac/api/experimental/endpoints.py
+++ b/airflow/www_rbac/api/experimental/endpoints.py
@@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import logging
import airflow.api
from airflow.api.common.experimental import pool as pool_api
@@ -26,7 +27,6 @@ from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.api.common.experimental.get_code import get_code
from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
from airflow.exceptions import AirflowException
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.strings import to_boolean
from airflow.utils import timezone
from airflow.www_rbac.app import csrf
@@ -35,7 +35,7 @@ from airflow.utils.db import create_session
from flask import g, Blueprint, jsonify, request, url_for
-_log = LoggingMixin().log
+log = logging.getLogger(__name__)
requires_authentication = airflow.api.API_AUTH.api_auth.requires_authentication
@@ -72,7 +72,7 @@ def trigger_dag(dag_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'
.format(execution_date))
- _log.info(error_message)
+ log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@@ -85,13 +85,13 @@ def trigger_dag(dag_id):
try:
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
if getattr(g, 'user', None):
- _log.info("User {} created {}".format(g.user, dr))
+ log.info("User {} created {}".format(g.user, dr))
response = jsonify(
message="Created {}".format(dr),
@@ -115,7 +115,7 @@ def dag_runs(dag_id):
state = request.args.get('state')
dagruns = get_dag_runs(dag_id, state)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 400
return response
@@ -136,7 +136,7 @@ def get_dag_code(dag_id):
try:
return get_code(dag_id)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -149,7 +149,7 @@ def task_info(dag_id, task_id):
try:
info = get_task(dag_id, task_id)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -213,7 +213,7 @@ def task_instance_info(dag_id, execution_date, task_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'
.format(execution_date))
- _log.info(error_message)
+ log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@@ -222,7 +222,7 @@ def task_instance_info(dag_id, execution_date, task_id):
try:
info = get_task_instance(dag_id, task_id, execution_date)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -254,7 +254,7 @@ def dag_run_status(dag_id, execution_date):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
- _log.info(error_message)
+ log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@@ -263,7 +263,7 @@ def dag_run_status(dag_id, execution_date):
try:
info = get_dag_run_state(dag_id, execution_date)
except AirflowException as err:
- _log.info(err)
+ log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -298,7 +298,7 @@ def get_pool(name):
try:
pool = pool_api.get_pool(name=name)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -313,7 +313,7 @@ def get_pools():
try:
pools = pool_api.get_pools()
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -330,7 +330,7 @@ def create_pool():
try:
pool = pool_api.create_pool(**params)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@@ -346,7 +346,7 @@ def delete_pool(name):
try:
pool = pool_api.delete_pool(name=name)
except AirflowException as err:
- _log.error(err)
+ log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
diff --git a/tests/contrib/operators/test_gcp_sql_operator_system_helper.py b/tests/contrib/operators/test_gcp_sql_operator_system_helper.py
index d2898ed..83f6027 100755
--- a/tests/contrib/operators/test_gcp_sql_operator_system_helper.py
+++ b/tests/contrib/operators/test_gcp_sql_operator_system_helper.py
@@ -219,16 +219,17 @@ class CloudSqlQueryTestHelper(LoggingCommandExecutor):
for member in binding['members']:
if not member.startswith('serviceAccount:gcp-storage-account'):
- self.log.info("Remove member: {}".format(member))
+ self.log.info("Remove member: %s", member)
member_type, member_email = member.split(':')
if member_type != 'serviceAccount':
- self.log.warning("Skip removing member {} as the type {} is "
- "not service account".format(member,
- member_type))
+ self.log.warning(
+ "Skip removing member %s as the type %s is not service account",
+ member, member_type
+ )
self.execute_cmd(['gsutil', 'acl', 'ch', '-d', member_email,
"gs://{}".format(export_bucket_name)])
else:
- self.log.info("Skip removing member {}".format(member))
+ self.log.info("Skip removing member %s", member)
@staticmethod
def set_ip_addresses_in_env():
diff --git a/tests/contrib/utils/gcp_authenticator.py b/tests/contrib/utils/gcp_authenticator.py
index 715d6fc..3f9d7ac 100644
--- a/tests/contrib/utils/gcp_authenticator.py
+++ b/tests/contrib/utils/gcp_authenticator.py
@@ -99,7 +99,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
- self.log.info('Airflow DB Session error:' + str(ex))
+ self.log.info('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
@@ -125,7 +125,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
- self.log.info('Airflow DB Session error:' + str(ex))
+ self.log.info('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
@@ -146,14 +146,14 @@ class GcpAuthenticator(LoggingCommandExecutor):
else:
gcp_config_dir = os.path.join(AIRFLOW_MAIN_FOLDER, os.pardir, "config")
if not os.path.isdir(gcp_config_dir):
- self.log.info("The {} is not a directory".format(gcp_config_dir))
+ self.log.info("The %s is not a directory", gcp_config_dir)
key_dir = os.path.join(gcp_config_dir, "keys")
if not os.path.isdir(key_dir):
- self.log.info("The {} is not a directory".format(key_dir))
+ self.log.info("The %s is not a directory", key_dir)
return
key_path = os.path.join(key_dir, self.gcp_key)
if not os.path.isfile(key_path):
- self.log.info("The {} is missing".format(key_path))
+ self.log.info("The %s file is missing", key_path)
self.full_key_path = key_path
def _validate_key_set(self):
@@ -171,7 +171,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
Authenticate with service account specified via key name.
"""
self._validate_key_set()
- self.log.info("Setting the GCP key to {}".format(self.full_key_path))
+ self.log.info("Setting the GCP key to %s", self.full_key_path)
# Checking if we can authenticate using service account credentials provided
self.execute_cmd(
[
@@ -205,7 +205,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
GcpAuthenticator.original_account = self.check_output(
['gcloud', 'config', 'get-value', 'account', '--project={}'.format(self.project_id)]
).decode('utf-8')
- self.log.info("Storing account: to restore it later {}".format(GcpAuthenticator.original_account))
+ self.log.info("Storing account: to restore it later %s", GcpAuthenticator.original_account)
def gcp_restore_authentication(self):
"""
@@ -213,7 +213,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
"""
self._validate_key_set()
if GcpAuthenticator.original_account:
- self.log.info("Restoring original account stored: {}".format(GcpAuthenticator.original_account))
+ self.log.info("Restoring original account stored: %s", GcpAuthenticator.original_account)
subprocess.call(
[
'gcloud',
diff --git a/tests/contrib/utils/logging_command_executor.py b/tests/contrib/utils/logging_command_executor.py
index 98f870d..58d920d 100644
--- a/tests/contrib/utils/logging_command_executor.py
+++ b/tests/contrib/utils/logging_command_executor.py
@@ -26,32 +26,32 @@ class LoggingCommandExecutor(LoggingMixin):
def execute_cmd(self, cmd, silent=False, cwd=None):
if silent:
- self.log.info("Executing in silent mode: '{}'".format(" ".join(cmd)))
- with open(os.devnull, 'w') as FNULL:
- return subprocess.call(args=cmd, stdout=FNULL, stderr=subprocess.STDOUT)
+ self.log.info("Executing in silent mode: '%s'", " ".join(cmd))
+ with open(os.devnull, 'w') as dev_null:
+ return subprocess.call(args=cmd, stdout=dev_null, stderr=subprocess.STDOUT)
else:
- self.log.info("Executing: '{}'".format(" ".join(cmd)))
+ self.log.info("Executing: '%s'", " ".join(cmd))
process = subprocess.Popen(
args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, cwd=cwd
)
output, err = process.communicate()
retcode = process.poll()
- self.log.info("Stdout: {}".format(output))
- self.log.info("Stderr: {}".format(err))
+ self.log.info("Stdout: %s", output)
+ self.log.info("Stderr: %s", err)
if retcode:
self.log.warning("Error when executing %s", " ".join(cmd))
return retcode
def check_output(self, cmd):
- self.log.info("Executing for output: '{}'".format(" ".join(cmd)))
+ self.log.info("Executing for output: '%s'", " ".join(cmd))
process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, err = process.communicate()
retcode = process.poll()
if retcode:
- self.log.info("Error when executing '{}'".format(" ".join(cmd)))
- self.log.info("Stdout: {}".format(output))
- self.log.info("Stderr: {}".format(err))
+ self.log.info("Error when executing '%s'", " ".join(cmd))
+ self.log.info("Stdout: %s", output)
+ self.log.info("Stderr: %s", err)
raise AirflowException("Retcode {} on {} with stdout: {}, stderr: {}".
format(retcode, " ".join(cmd), output, err))
return output
diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py
index cac73e9..9e86d11 100644
--- a/tests/security/test_kerberos.py
+++ b/tests/security/test_kerberos.py
@@ -22,8 +22,8 @@ import unittest
from argparse import Namespace
from airflow.configuration import conf
+from airflow.security import kerberos
from airflow.security.kerberos import renew_from_kt
-from airflow import LoggingMixin
from tests.test_utils.config import conf_vars
@@ -57,7 +57,7 @@ class KerberosTest(unittest.TestCase):
renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab)
- with self.assertLogs(LoggingMixin().log) as log:
+ with self.assertLogs(kerberos.log) as log:
self.assertIn(
'kinit: krb5_init_creds_set_keytab: Failed to find '
'airflow@LUPUS.GRIDDYNAMICS.NET in keytab FILE:{} '