You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/13 07:37:33 UTC
[5/5] incubator-airflow git commit: [AIRFLOW-1582] Improve logging
within Airflow
[AIRFLOW-1582] Improve logging within Airflow
Clean the way of logging within Airflow. Remove
the old logging.py and
move to the airflow.utils.log.* interface. Remove
setting the logging
outside of the settings/configuration code. Move
away from the string
format to logging_function(msg, *args).
Closes #2592 from Fokko/AIRFLOW-1582-Improve-
logging-structure
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a7a51890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a7a51890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a7a51890
Branch: refs/heads/master
Commit: a7a518902dcf1e7fd4bf477cf57cee691f181b29
Parents: 5de632e
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed Sep 13 09:36:58 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 13 09:36:58 2017 +0200
----------------------------------------------------------------------
UPDATING.md | 4 +-
airflow/__init__.py | 10 +-
airflow/api/__init__.py | 12 +-
airflow/api/auth/backend/kerberos_auth.py | 14 +-
airflow/api/common/experimental/get_task.py | 4 -
.../common/experimental/get_task_instance.py | 4 -
airflow/bin/airflow | 1 -
airflow/bin/cli.py | 46 +--
airflow/configuration.py | 25 +-
.../auth/backends/github_enterprise_auth.py | 5 +-
airflow/contrib/auth/backends/google_auth.py | 11 +-
airflow/contrib/auth/backends/kerberos_auth.py | 5 +-
airflow/contrib/auth/backends/ldap_auth.py | 32 +-
airflow/contrib/auth/backends/password_auth.py | 10 +-
airflow/contrib/executors/mesos_executor.py | 51 +--
airflow/contrib/hooks/bigquery_hook.py | 57 +--
airflow/contrib/hooks/cloudant_hook.py | 10 +-
airflow/contrib/hooks/databricks_hook.py | 15 +-
airflow/contrib/hooks/datadog_hook.py | 13 +-
airflow/contrib/hooks/datastore_hook.py | 3 +-
airflow/contrib/hooks/ftp_hook.py | 10 +-
airflow/contrib/hooks/gcp_api_base_hook.py | 13 +-
airflow/contrib/hooks/gcp_dataflow_hook.py | 29 +-
airflow/contrib/hooks/gcp_dataproc_hook.py | 28 +-
airflow/contrib/hooks/gcp_mlengine_hook.py | 48 ++-
airflow/contrib/hooks/gcs_hook.py | 10 +-
airflow/contrib/hooks/jira_hook.py | 9 +-
airflow/contrib/hooks/qubole_hook.py | 22 +-
airflow/contrib/hooks/redis_hook.py | 16 +-
airflow/contrib/hooks/salesforce_hook.py | 31 +-
airflow/contrib/hooks/spark_sql_hook.py | 10 +-
airflow/contrib/hooks/spark_submit_hook.py | 24 +-
airflow/contrib/hooks/sqoop_hook.py | 17 +-
airflow/contrib/hooks/ssh_hook.py | 34 +-
airflow/contrib/operators/bigquery_operator.py | 4 +-
.../operators/bigquery_table_delete_operator.py | 4 +-
.../contrib/operators/bigquery_to_bigquery.py | 8 +-
airflow/contrib/operators/bigquery_to_gcs.py | 4 +-
.../contrib/operators/databricks_operator.py | 25 +-
airflow/contrib/operators/dataproc_operator.py | 30 +-
.../operators/datastore_export_operator.py | 5 +-
.../operators/datastore_import_operator.py | 6 +-
airflow/contrib/operators/ecs_operator.py | 24 +-
.../contrib/operators/emr_add_steps_operator.py | 7 +-
.../operators/emr_create_job_flow_operator.py | 10 +-
.../emr_terminate_job_flow_operator.py | 7 +-
airflow/contrib/operators/file_to_wasb.py | 14 +-
airflow/contrib/operators/fs_operator.py | 4 +-
.../contrib/operators/gcs_download_operator.py | 6 +-
airflow/contrib/operators/gcs_to_bq.py | 8 +-
airflow/contrib/operators/hipchat_operator.py | 3 +-
airflow/contrib/operators/mlengine_operator.py | 30 +-
.../operators/mlengine_prediction_summary.py | 2 -
airflow/contrib/operators/mysql_to_gcs.py | 4 +-
airflow/contrib/operators/sftp_operator.py | 7 +-
.../contrib/operators/spark_submit_operator.py | 6 +-
airflow/contrib/operators/ssh_operator.py | 1 -
airflow/contrib/operators/vertica_operator.py | 5 +-
airflow/contrib/operators/vertica_to_hive.py | 5 +-
airflow/contrib/sensors/bigquery_sensor.py | 5 +-
airflow/contrib/sensors/datadog_sensor.py | 5 +-
airflow/contrib/sensors/emr_base_sensor.py | 9 +-
airflow/contrib/sensors/emr_job_flow_sensor.py | 6 +-
airflow/contrib/sensors/emr_step_sensor.py | 5 +-
airflow/contrib/sensors/ftp_sensor.py | 4 +-
airflow/contrib/sensors/gcs_sensor.py | 7 +-
airflow/contrib/sensors/hdfs_sensors.py | 12 +-
airflow/contrib/sensors/jira_sensor.py | 27 +-
airflow/contrib/sensors/redis_key_sensor.py | 4 -
airflow/contrib/sensors/wasb_sensor.py | 11 +-
.../contrib/task_runner/cgroup_task_runner.py | 49 +--
airflow/executors/__init__.py | 8 +-
airflow/executors/base_executor.py | 18 +-
airflow/executors/celery_executor.py | 24 +-
airflow/executors/dask_executor.py | 10 +-
airflow/executors/local_executor.py | 11 +-
airflow/executors/sequential_executor.py | 4 +-
airflow/hooks/S3_hook.py | 52 +--
airflow/hooks/base_hook.py | 9 +-
airflow/hooks/dbapi_hook.py | 33 +-
airflow/hooks/druid_hook.py | 10 +-
airflow/hooks/hive_hooks.py | 37 +-
airflow/hooks/http_hook.py | 7 +-
airflow/hooks/oracle_hook.py | 9 +-
airflow/hooks/pig_hook.py | 6 +-
airflow/hooks/presto_hook.py | 4 -
airflow/hooks/webhdfs_hook.py | 17 +-
airflow/hooks/zendesk_hook.py | 20 +-
airflow/jobs.py | 349 ++++++++++---------
airflow/models.py | 211 +++++------
airflow/operators/bash_operator.py | 23 +-
airflow/operators/check_operator.py | 24 +-
airflow/operators/dagrun_operator.py | 6 +-
airflow/operators/docker_operator.py | 12 +-
airflow/operators/generic_transfer.py | 13 +-
airflow/operators/hive_operator.py | 4 +-
airflow/operators/hive_stats_operator.py | 10 +-
airflow/operators/hive_to_druid.py | 18 +-
airflow/operators/hive_to_mysql.py | 14 +-
airflow/operators/hive_to_samba_operator.py | 6 +-
airflow/operators/http_operator.py | 8 +-
airflow/operators/jdbc_operator.py | 7 +-
airflow/operators/latest_only_operator.py | 19 +-
airflow/operators/mssql_operator.py | 5 +-
airflow/operators/mssql_to_hive.py | 6 +-
airflow/operators/mysql_operator.py | 5 +-
airflow/operators/mysql_to_hive.py | 5 +-
airflow/operators/oracle_operator.py | 5 +-
airflow/operators/pig_operator.py | 4 +-
airflow/operators/postgres_operator.py | 5 +-
airflow/operators/presto_to_mysql.py | 12 +-
airflow/operators/python_operator.py | 24 +-
airflow/operators/redshift_to_s3_operator.py | 17 +-
airflow/operators/s3_file_transform_operator.py | 30 +-
airflow/operators/s3_to_hive_operator.py | 39 +--
airflow/operators/sensors.py | 49 +--
airflow/operators/slack_operator.py | 9 +-
airflow/operators/sqlite_operator.py | 5 +-
airflow/plugins_manager.py | 11 +-
airflow/security/kerberos.py | 25 +-
airflow/settings.py | 17 +-
airflow/task_runner/base_task_runner.py | 9 +-
airflow/utils/dag_processing.py | 55 +--
airflow/utils/db.py | 11 +-
airflow/utils/email.py | 8 +-
airflow/utils/log/LoggingMixin.py | 45 +++
airflow/utils/log/file_task_handler.py | 34 +-
airflow/utils/log/gcs_task_handler.py | 125 ++++++-
airflow/utils/log/s3_task_handler.py | 97 +++++-
airflow/utils/logging.py | 252 -------------
airflow/utils/timeout.py | 17 +-
airflow/www/api/experimental/endpoints.py | 6 +-
airflow/www/app.py | 10 +-
airflow/www/views.py | 9 +-
setup.py | 11 -
tests/contrib/hooks/test_databricks_hook.py | 15 +-
.../contrib/operators/test_dataproc_operator.py | 59 ++--
tests/core.py | 16 +-
tests/operators/sensors.py | 53 +--
tests/utils/log/test_logging.py | 108 ++++++
tests/utils/test_logging.py | 103 ------
141 files changed, 1578 insertions(+), 1747 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 92ee4b4..cde7141 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -13,7 +13,9 @@ assists people when migrating to a new version.
- No updates are required if you are using ftpHook, it will continue work as is.
### Logging update
- Logs now are stored in the log folder as ``{dag_id}/{task_id}/{execution_date}/{try_number}.log``.
+Airflow's logging has been rewritten to uses Python’s builtin `logging` module to perform system logging. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. The main benefit that this brings to us is the easy configuration of the logging through `default_airflow_logging.py` and the ability to use different handlers for logging.
+
+Logs now are stored in the log folder as `{dag_id}/{task_id}/{execution_date}/{try_number}.log`.
### New Features
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 3daa6e2..8844eeb 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -21,9 +21,10 @@ in their PYTHONPATH. airflow_login should be based off the
"""
from builtins import object
from airflow import version
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
__version__ = version.version
-import logging
import sys
from airflow import configuration as conf
@@ -40,13 +41,15 @@ login = None
def load_login():
+ log = LoggingMixin().logger
+
auth_backend = 'airflow.default_login'
try:
if conf.getboolean('webserver', 'AUTHENTICATE'):
auth_backend = conf.get('webserver', 'auth_backend')
except conf.AirflowConfigException:
if conf.getboolean('webserver', 'AUTHENTICATE'):
- logging.warning(
+ log.warning(
"auth_backend not found in webserver config reverting to "
"*deprecated* behavior of importing airflow_login")
auth_backend = "airflow_login"
@@ -55,7 +58,7 @@ def load_login():
global login
login = import_module(auth_backend)
except ImportError as err:
- logging.critical(
+ log.critical(
"Cannot import authentication module %s. "
"Please correct your authentication backend or disable authentication: %s",
auth_backend, err
@@ -76,7 +79,6 @@ from airflow import operators
from airflow import hooks
from airflow import executors
from airflow import macros
-from airflow import contrib
operators._integrate_plugins()
hooks._integrate_plugins()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index ae47abf..39edbed 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -13,14 +13,16 @@
# limitations under the License.
from __future__ import print_function
-import logging
-
from airflow.exceptions import AirflowException
from airflow import configuration as conf
from importlib import import_module
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
api_auth = None
+log = LoggingMixin().logger
+
def load_auth():
auth_backend = 'airflow.api.auth.backend.default'
@@ -33,6 +35,8 @@ def load_auth():
global api_auth
api_auth = import_module(auth_backend)
except ImportError as err:
- logging.critical("Cannot import {} for API authentication due to: {}"
- .format(auth_backend, err))
+ log.critical(
+ "Cannot import %s for API authentication due to: %s",
+ auth_backend, err
+ )
raise AirflowException(err)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/auth/backend/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index d1c3b70..73a5aa2 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -23,10 +23,12 @@
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from future.standard_library import install_aliases
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
install_aliases()
import kerberos
-import logging
import os
from airflow import configuration as conf
@@ -45,6 +47,8 @@ client_auth = HTTPKerberosAuth(service='airflow')
_SERVICE_NAME = None
+log = LoggingMixin().logger
+
def init_app(app):
global _SERVICE_NAME
@@ -52,7 +56,7 @@ def init_app(app):
hostname = app.config.get('SERVER_NAME')
if not hostname:
hostname = getfqdn()
- logging.info("Kerberos: hostname {}".format(hostname))
+ log.info("Kerberos: hostname %s", hostname)
service = 'airflow'
@@ -62,12 +66,12 @@ def init_app(app):
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
try:
- logging.info("Kerberos init: {} {}".format(service, hostname))
+ log.info("Kerberos init: %s %s", service, hostname)
principal = kerberos.getServerPrincipalDetails(service, hostname)
except kerberos.KrbError as err:
- logging.warning("Kerberos: {}".format(err))
+ log.warning("Kerberos: %s", err)
else:
- logging.info("Kerberos API: server is {}".format(principal))
+ log.info("Kerberos API: server is %s", principal)
def _unauthorized():
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/common/experimental/get_task.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index 39ab423..9023ad1 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -12,13 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from airflow.exceptions import AirflowException
from airflow.models import DagBag
-_log = logging.getLogger(__name__)
-
def get_task(dag_id, task_id):
"""Return the task object identified by the given dag_id and task_id."""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/common/experimental/get_task_instance.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index 4c50731..7ab5e6e 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -12,13 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from airflow.exceptions import AirflowException
from airflow.models import DagBag
-_log = logging.getLogger(__name__)
-
def get_task_instance(dag_id, task_id, execution_date):
"""Return the task object identified by the given dag_id and task_id."""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/bin/airflow
----------------------------------------------------------------------
diff --git a/airflow/bin/airflow b/airflow/bin/airflow
index 0598596..2c0024d 100755
--- a/airflow/bin/airflow
+++ b/airflow/bin/airflow
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
import os
from airflow import configuration
from airflow.bin.cli import CLIFactory
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a0545c3..56f1855 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,6 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
+from airflow.utils.log.LoggingMixin import LoggingMixin
from airflow.www.app import cached_app
from sqlalchemy import func
@@ -63,6 +64,8 @@ api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.api_auth.client_auth)
+log = LoggingMixin().logger
+
def sigint_handler(sig, frame):
sys.exit(0)
@@ -186,19 +189,21 @@ def trigger_dag(args):
:param args:
:return:
"""
+ log = LoggingMixin().logger
try:
message = api_client.trigger_dag(dag_id=args.dag_id,
run_id=args.run_id,
conf=args.conf,
execution_date=args.exec_date)
except IOError as err:
- logging.error(err)
+ log.error(err)
raise AirflowException(err)
-
- logging.info(message)
+ log.info(message)
def pool(args):
+ log = LoggingMixin().logger
+
def _tabulate(pools):
return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
tablefmt="fancy_grid")
@@ -215,9 +220,9 @@ def pool(args):
else:
pools = api_client.get_pools()
except (AirflowException, IOError) as err:
- logging.error(err)
+ log.error(err)
else:
- logging.info(_tabulate(pools=pools))
+ log.info(_tabulate(pools=pools))
def variables(args):
@@ -325,6 +330,8 @@ def run(args, dag=None):
if dag:
args.dag_id = dag.dag_id
+ log = LoggingMixin().logger
+
# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
@@ -343,7 +350,7 @@ def run(args, dag=None):
dag = get_dag(args)
elif not dag:
session = settings.Session()
- logging.info('Loading pickle id {args.pickle}'.format(args=args))
+ log.info('Loading pickle id {args.pickle}'.format(args=args))
dag_pickle = session.query(
DagPickle).filter(DagPickle.id == args.pickle).first()
if not dag_pickle:
@@ -354,11 +361,11 @@ def run(args, dag=None):
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()
- logger = logging.getLogger('airflow.task')
+ log = logging.getLogger('airflow.task')
if args.raw:
- logger = logging.getLogger('airflow.task.raw')
+ log = logging.getLogger('airflow.task.raw')
- for handler in logger.handlers:
+ for handler in log.handlers:
try:
handler.set_context(ti)
except AttributeError:
@@ -367,7 +374,7 @@ def run(args, dag=None):
pass
hostname = socket.getfqdn()
- logging.info("Running on host {}".format(hostname))
+ log.info("Running on host %s", hostname)
if args.local:
run_job = jobs.LocalTaskJob(
@@ -396,6 +403,7 @@ def run(args, dag=None):
session.add(pickle)
session.commit()
pickle_id = pickle.id
+ # TODO: This should be written to a log
print((
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals()))
@@ -427,7 +435,7 @@ def run(args, dag=None):
# might subsequently read from the log to insert into S3 or
# Google cloud storage. Explicitly close the handler is
# needed in order to upload to remote storage services.
- for handler in logger.handlers:
+ for handler in log.handlers:
handler.flush()
handler.close()
@@ -449,6 +457,7 @@ def task_failed_deps(args):
dep_context = DepContext(deps=SCHEDULER_DEPS)
failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
+ # TODO, Do we want to print or log this
if failed_deps:
print("Task instance dependencies not met:")
for dep in failed_deps:
@@ -605,8 +614,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
def start_refresh(gunicorn_master_proc):
batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
- logging.debug('%s doing a refresh of %s workers',
- state, batch_size)
+ log.debug('%s doing a refresh of %s workers', state, batch_size)
sys.stdout.flush()
sys.stderr.flush()
@@ -628,14 +636,14 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
# Whenever some workers are not ready, wait until all workers are ready
if num_ready_workers_running < num_workers_running:
- logging.debug('%s some workers are starting up, waiting...', state)
+ log.debug('%s some workers are starting up, waiting...', state)
sys.stdout.flush()
time.sleep(1)
# Kill a worker gracefully by asking gunicorn to reduce number of workers
elif num_workers_running > num_workers_expected:
excess = num_workers_running - num_workers_expected
- logging.debug('%s killing %s workers', state, excess)
+ log.debug('%s killing %s workers', state, excess)
for _ in range(excess):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
@@ -646,7 +654,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
- logging.debug(
+ log.debug(
'%s sleeping for %ss starting doing a refresh...',
state, refresh_interval
)
@@ -655,7 +663,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
else:
# num_ready_workers_running == num_workers_running < num_workers_expected
- logging.error((
+ log.error((
"%s some workers seem to have died and gunicorn"
"did not restart them as expected"
), state)
@@ -770,7 +778,7 @@ def webserver(args):
gunicorn_master_proc_pid = int(f.read())
break
except IOError:
- logging.debug("Waiting for gunicorn's pid file to be created.")
+ log.debug("Waiting for gunicorn's pid file to be created.")
time.sleep(0.1)
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
@@ -896,8 +904,6 @@ def resetdb(args):
if args.yes or input(
"This will drop existing tables if they exist. "
"Proceed? (y/n)").upper() == "Y":
- logging.basicConfig(level=settings.LOGGING_LEVEL,
- format=settings.SIMPLE_LOG_FORMAT)
db_utils.resetdb()
else:
print("Bail.")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 460d975..db196f9 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -19,7 +19,6 @@ from __future__ import unicode_literals
import copy
import errno
-import logging
import os
import six
import subprocess
@@ -28,6 +27,9 @@ import shlex
import sys
from future import standard_library
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
standard_library.install_aliases()
from builtins import str
@@ -36,6 +38,8 @@ from six.moves import configparser
from airflow.exceptions import AirflowConfigException
+log = LoggingMixin().logger
+
# show Airflow's deprecation warnings
warnings.filterwarnings(
action='default', category=DeprecationWarning, module='airflow')
@@ -198,8 +202,9 @@ class AirflowConfigParser(ConfigParser):
return option
else:
- logging.warning("section/key [{section}/{key}] not found "
- "in config".format(**locals()))
+ log.warning(
+ "section/key [{section}/{key}] not found in config".format(**locals())
+ )
raise AirflowConfigException(
"section/key [{section}/{key}] not found "
@@ -366,20 +371,22 @@ else:
TEMPLATE_START = (
'# ----------------------- TEMPLATE BEGINS HERE -----------------------')
if not os.path.isfile(TEST_CONFIG_FILE):
- logging.info(
- 'Creating new Airflow config file for unit tests in: {}'.format(
- TEST_CONFIG_FILE))
+ log.info(
+ 'Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE
+ )
with open(TEST_CONFIG_FILE, 'w') as f:
cfg = parameterized_config(TEST_CONFIG)
f.write(cfg.split(TEMPLATE_START)[-1].strip())
if not os.path.isfile(AIRFLOW_CONFIG):
- logging.info('Creating new Airflow config file in: {}'.format(
- AIRFLOW_CONFIG))
+ log.info(
+ 'Creating new Airflow config file in: %s',
+ AIRFLOW_CONFIG
+ )
with open(AIRFLOW_CONFIG, 'w') as f:
cfg = parameterized_config(DEFAULT_CONFIG)
f.write(cfg.split(TEMPLATE_START)[-1].strip())
-logging.info("Reading the config from " + AIRFLOW_CONFIG)
+log.info("Reading the config from %s", AIRFLOW_CONFIG)
conf = AirflowConfigParser()
conf.read(AIRFLOW_CONFIG)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 91126c7..459e9c9 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
import flask_login
# Need to expose these downstream
@@ -29,8 +27,9 @@ from flask_oauthlib.client import OAuth
from airflow import models, configuration, settings
from airflow.configuration import AirflowConfigException
+from airflow.utils.log.LoggingMixin import LoggingMixin
-_log = logging.getLogger(__name__)
+log = LoggingMixin().logger
def get_config_param(param):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index 70c8e13..f38f725 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
import flask_login
# Need to expose these downstream
@@ -28,9 +26,9 @@ from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
from airflow import models, configuration, settings
-from airflow.configuration import AirflowConfigException
+from airflow.utils.log.LoggingMixin import LoggingMixin
-_log = logging.getLogger(__name__)
+log = LoggingMixin().logger
def get_config_param(param):
@@ -106,7 +104,7 @@ class GoogleAuthBackend(object):
self.oauth_callback)
def login(self, request):
- _log.debug('Redirecting user to Google login')
+ log.debug('Redirecting user to Google login')
return self.google_oauth.authorize(callback=url_for(
'google_oauth_callback',
_external=True,
@@ -142,7 +140,7 @@ class GoogleAuthBackend(object):
return GoogleUser(user)
def oauth_callback(self):
- _log.debug('Google OAuth callback called')
+ log.debug('Google OAuth callback called')
next_url = request.args.get('next') or url_for('admin.index')
@@ -162,7 +160,6 @@ class GoogleAuthBackend(object):
return redirect(url_for('airflow.noaccess'))
except AuthenticationError:
- _log.exception('')
return redirect(url_for('airflow.noaccess'))
session = settings.Session()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index e381059..ffb711f 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -29,8 +29,7 @@ from flask import url_for, redirect
from airflow import settings
from airflow import models
from airflow import configuration
-
-import logging
+from airflow.utils.log.LoggingMixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
@@ -41,7 +40,7 @@ class AuthenticationError(Exception):
pass
-class KerberosUser(models.User):
+class KerberosUser(models.User, LoggingMixin):
def __init__(self, user):
self.user = user
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 341f710..8ce0875 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -30,16 +30,16 @@ from airflow import models
from airflow import configuration
from airflow.configuration import AirflowConfigException
-import logging
-
import traceback
import re
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
-LOG = logging.getLogger(__name__)
+log = LoggingMixin().logger
class AuthenticationError(Exception):
@@ -64,7 +64,7 @@ def get_ldap_connection(dn=None, password=None):
conn = Connection(server, native(dn), native(password))
if not conn.bind():
- LOG.error("Cannot bind to ldap server: %s ", conn.last_error)
+ log.error("Cannot bind to ldap server: %s ", conn.last_error)
raise AuthenticationError("Cannot bind to ldap server")
return conn
@@ -74,7 +74,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
search_filter = '(&({0}))'.format(group_filter)
if not conn.search(native(search_base), native(search_filter),
attributes=[native(user_name_attr)]):
- LOG.warning("Unable to find group for %s %s", search_base, search_filter)
+ log.warning("Unable to find group for %s %s", search_base, search_filter)
else:
for resp in conn.response:
if (
@@ -95,11 +95,11 @@ def groups_user(conn, search_base, user_filter, user_name_att, username):
memberof_attr = "memberOf"
res = conn.search(native(search_base), native(search_filter), attributes=[native(memberof_attr)])
if not res:
- LOG.info("Cannot find user %s", username)
+ log.info("Cannot find user %s", username)
raise AuthenticationError("Invalid username or password")
if conn.response and memberof_attr not in conn.response[0]["attributes"]:
- LOG.warning("""Missing attribute "%s" when looked-up in Ldap database.
+ log.warning("""Missing attribute "%s" when looked-up in Ldap database.
The user does not seem to be a member of a group and therefore won't see any dag
if the option filter_by_owner=True and owner_mode=ldapgroup are set""", memberof_attr)
return []
@@ -111,7 +111,7 @@ def groups_user(conn, search_base, user_filter, user_name_att, username):
try:
groups_list = [regex.search(i).group(1) for i in user_groups]
except IndexError:
- LOG.warning("Parsing error when retrieving the user's group(s)."
+ log.warning("Parsing error when retrieving the user's group(s)."
" Check if the user belongs to at least one group"
" or if the user's groups name do not contain special characters")
@@ -134,7 +134,7 @@ class LdapUser(models.User):
user.username)
except AirflowConfigException:
self.superuser = True
- LOG.debug("Missing configuration for superuser settings. Skipping.")
+ log.debug("Missing configuration for superuser settings. Skipping.")
try:
self.data_profiler = group_contains_user(conn,
@@ -144,7 +144,7 @@ class LdapUser(models.User):
user.username)
except AirflowConfigException:
self.data_profiler = True
- LOG.debug("Missing configuration for dataprofiler settings. Skipping")
+ log.debug("Missing configuration for dataprofiler settings. Skipping")
# Load the ldap group(s) a user belongs to
try:
@@ -154,7 +154,7 @@ class LdapUser(models.User):
configuration.get("ldap", "user_name_attr"),
user.username)
except AirflowConfigException:
- LOG.debug("Missing configuration for ldap settings. Skipping")
+ log.debug("Missing configuration for ldap settings. Skipping")
@staticmethod
def try_login(username, password):
@@ -185,7 +185,7 @@ class LdapUser(models.User):
# todo: use list or result?
if not res:
- LOG.info("Cannot find user %s", username)
+ log.info("Cannot find user %s", username)
raise AuthenticationError("Invalid username or password")
entry = conn.response[0]
@@ -200,14 +200,14 @@ class LdapUser(models.User):
try:
conn = get_ldap_connection(entry['dn'], password)
except KeyError as e:
- LOG.error("""
+ log.error("""
Unable to parse LDAP structure. If you're using Active Directory and not specifying an OU, you must set search_scope=SUBTREE in airflow.cfg.
%s
""" % traceback.format_exc())
raise LdapException("Could not parse LDAP structure. Try setting search_scope in airflow.cfg, or check logs")
if not conn:
- LOG.info("Password incorrect for user %s", username)
+ log.info("Password incorrect for user %s", username)
raise AuthenticationError("Invalid username or password")
def is_active(self):
@@ -237,7 +237,7 @@ class LdapUser(models.User):
@login_manager.user_loader
def load_user(userid):
- LOG.debug("Loading user %s", userid)
+ log.debug("Loading user %s", userid)
if not userid or userid == 'None':
return None
@@ -270,7 +270,7 @@ def login(self, request):
try:
LdapUser.try_login(username, password)
- LOG.info("User %s successfully authenticated", username)
+ log.info("User %s successfully authenticated", username)
session = settings.Session()
user = session.query(models.User).filter(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/password_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index 000b986..3ad2a8b 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -32,15 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
from airflow import settings
from airflow import models
-from airflow import configuration
-
-import logging
+from airflow.utils.log.LoggingMixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
-LOG = logging.getLogger(__name__)
+log = LoggingMixin().logger
PY3 = version_info[0] == 3
@@ -94,7 +92,7 @@ class PasswordUser(models.User):
@login_manager.user_loader
def load_user(userid):
- LOG.debug("Loading user %s", userid)
+ log.debug("Loading user %s", userid)
if not userid or userid == 'None':
return None
@@ -137,7 +135,7 @@ def login(self, request):
if not user.authenticate(password):
session.close()
raise AuthenticationError()
- LOG.info("User %s successfully authenticated", username)
+ log.info("User %s successfully authenticated", username)
flask_login.login_user(user)
session.commit()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index 49788fc..19d72ed 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -13,9 +13,12 @@
# limitations under the License.
from future import standard_library
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.www.utils import LoginMixin
+
standard_library.install_aliases()
from builtins import str
-import logging
from queue import Queue
import mesos.interface
@@ -41,7 +44,7 @@ def get_framework_name():
# AirflowMesosScheduler, implements Mesos Scheduler interface
# To schedule airflow jobs on mesos
-class AirflowMesosScheduler(mesos.interface.Scheduler):
+class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
"""
Airflow Mesos scheduler implements mesos scheduler interface
to schedule airflow tasks on mesos.
@@ -49,7 +52,6 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
'airflow run <dag_id> <task_instance_id> <start_date> --local -p=<pickle>'
to run on a mesos slave.
"""
-
def __init__(self,
task_queue,
result_queue,
@@ -63,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
self.task_key_map = {}
def registered(self, driver, frameworkId, masterInfo):
- logging.info("AirflowScheduler registered to mesos with framework ID %s", frameworkId.value)
+ self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'):
# Import here to work around a circular import error
@@ -84,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
Session.remove()
def reregistered(self, driver, masterInfo):
- logging.info("AirflowScheduler re-registered to mesos")
+ self.logger.info("AirflowScheduler re-registered to mesos")
def disconnected(self, driver):
- logging.info("AirflowScheduler disconnected from mesos")
+ self.logger.info("AirflowScheduler disconnected from mesos")
def offerRescinded(self, driver, offerId):
- logging.info("AirflowScheduler offer %s rescinded", str(offerId))
+ self.logger.info("AirflowScheduler offer %s rescinded", str(offerId))
def frameworkMessage(self, driver, executorId, slaveId, message):
- logging.info("AirflowScheduler received framework message %s", message)
+ self.logger.info("AirflowScheduler received framework message %s", message)
def executorLost(self, driver, executorId, slaveId, status):
- logging.warning("AirflowScheduler executor %s lost", str(executorId))
+ self.logger.warning("AirflowScheduler executor %s lost", str(executorId))
def slaveLost(self, driver, slaveId):
- logging.warning("AirflowScheduler slave %s lost", str(slaveId))
+ self.logger.warning("AirflowScheduler slave %s lost", str(slaveId))
def error(self, driver, message):
- logging.error("AirflowScheduler driver aborted %s", message)
+ self.logger.error("AirflowScheduler driver aborted %s", message)
raise AirflowException("AirflowScheduler driver aborted %s" % message)
def resourceOffers(self, driver, offers):
@@ -116,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
elif resource.name == "mem":
offerMem += resource.scalar.value
- logging.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
+ self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
remainingCpus = offerCpus
remainingMem = offerMem
@@ -129,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
self.task_counter += 1
self.task_key_map[str(tid)] = key
- logging.info("Launching task %d using offer %s", tid, offer.id.value)
+ self.logger.info("Launching task %d using offer %s", tid, offer.id.value)
task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
@@ -159,15 +161,17 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
driver.launchTasks(offer.id, tasks)
def statusUpdate(self, driver, update):
- logging.info("Task %s is in state %s, data %s",
- update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data))
+ self.logger.info(
+ "Task %s is in state %s, data %s",
+ update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)
+ )
try:
key = self.task_key_map[update.task_id.value]
except KeyError:
# The map may not contain an item if the framework re-registered after a failover.
# Discard these tasks.
- logging.warning("Unrecognised task key %s" % update.task_id.value)
+ self.logger.warning("Unrecognised task key %s", update.task_id.value)
return
if update.state == mesos_pb2.TASK_FINISHED:
@@ -181,7 +185,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
self.task_queue.task_done()
-class MesosExecutor(BaseExecutor):
+class MesosExecutor(BaseExecutor, LoginMixin):
"""
MesosExecutor allows distributing the execution of task
instances to multiple mesos workers.
@@ -192,7 +196,6 @@ class MesosExecutor(BaseExecutor):
elastic distributed systems to easily be built and run effectively.
See http://mesos.apache.org/
"""
-
def start(self):
self.task_queue = Queue()
self.result_queue = Queue()
@@ -200,7 +203,7 @@ class MesosExecutor(BaseExecutor):
framework.user = ''
if not configuration.get('mesos', 'MASTER'):
- logging.error("Expecting mesos master URL for mesos executor")
+ self.logger.error("Expecting mesos master URL for mesos executor")
raise AirflowException("mesos.master not provided for mesos executor")
master = configuration.get('mesos', 'MASTER')
@@ -236,17 +239,19 @@ class MesosExecutor(BaseExecutor):
else:
framework.checkpoint = False
- logging.info('MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
- master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint))
+ self.logger.info(
+ 'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
+ master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)
+ )
implicit_acknowledgements = 1
if configuration.getboolean('mesos', 'AUTHENTICATE'):
if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
- logging.error("Expecting authentication principal in the environment")
+ self.logger.error("Expecting authentication principal in the environment")
raise AirflowException("mesos.default_principal not provided in authenticated mode")
if not configuration.get('mesos', 'DEFAULT_SECRET'):
- logging.error("Expecting authentication secret in the environment")
+ self.logger.error("Expecting authentication secret in the environment")
raise AirflowException("mesos.default_secret not provided in authenticated mode")
credential = mesos_pb2.Credential()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index d2ce2b0..497fa28 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -18,7 +18,6 @@ This module contains a BigQuery Hook, as well as a very basic PEP 249
implementation for BigQuery.
"""
-import logging
import time
from apiclient.discovery import build, HttpError
@@ -33,11 +32,10 @@ from past.builtins import basestring
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-logging.getLogger("bigquery").setLevel(logging.INFO)
-
-class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
+class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
"""
Interact with BigQuery. This hook uses the Google Cloud Platform
connection.
@@ -178,13 +176,12 @@ class BigQueryConnection(object):
"BigQueryConnection does not have transactions")
-class BigQueryBaseCursor(object):
+class BigQueryBaseCursor(LoggingMixin):
"""
The BigQuery base cursor contains helper methods to execute queries against
BigQuery. The methods can be used directly by operators, in cases where a
PEP 249 cursor isn't needed.
"""
-
def __init__(self, service, project_id):
self.service = service
self.project_id = project_id
@@ -290,10 +287,12 @@ class BigQueryBaseCursor(object):
:param print_header: Whether to print a header for a CSV file extract.
:type print_header: boolean
"""
+
source_project, source_dataset, source_table = \
_split_tablename(table_input=source_project_dataset_table,
default_project_id=self.project_id,
var_name='source_project_dataset_table')
+
configuration = {
'extract': {
'sourceTable': {
@@ -500,7 +499,7 @@ class BigQueryBaseCursor(object):
"'WRITE_APPEND' or 'WRITE_TRUNCATE'."
)
else:
- logging.info(
+ self.logger.info(
"Adding experimental "
"'schemaUpdateOptions': {0}".format(schema_update_options)
)
@@ -577,12 +576,12 @@ class BigQueryBaseCursor(object):
)
)
else:
- logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+ self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
time.sleep(5)
except HttpError as err:
if err.resp.status in [500, 503]:
- logging.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+ self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
time.sleep(5)
else:
raise Exception(
@@ -661,14 +660,14 @@ class BigQueryBaseCursor(object):
datasetId=deletion_dataset,
tableId=deletion_table) \
.execute()
- logging.info('Deleted table %s:%s.%s.',
+ self.logger.info('Deleted table %s:%s.%s.',
deletion_project, deletion_dataset, deletion_table)
except HttpError:
if not ignore_if_missing:
raise Exception(
'Table deletion failed. Table does not exist.')
else:
- logging.info('Table does not exist. Skipping.')
+ self.logger.info('Table does not exist. Skipping.')
def run_table_upsert(self, dataset_id, table_resource, project_id=None):
@@ -695,8 +694,10 @@ class BigQueryBaseCursor(object):
for table in tables_list_resp.get('tables', []):
if table['tableReference']['tableId'] == table_id:
# found the table, do update
- logging.info('table %s:%s.%s exists, updating.',
- project_id, dataset_id, table_id)
+ self.logger.info(
+ 'Table %s:%s.%s exists, updating.',
+ project_id, dataset_id, table_id
+ )
return self.service.tables().update(projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
@@ -711,8 +712,10 @@ class BigQueryBaseCursor(object):
# If there is no next page, then the table doesn't exist.
else:
# do insert
- logging.info('table %s:%s.%s does not exist. creating.',
- project_id, dataset_id, table_id)
+ self.logger.info(
+ 'Table %s:%s.%s does not exist. creating.',
+ project_id, dataset_id, table_id
+ )
return self.service.tables().insert(projectId=project_id,
datasetId=dataset_id,
body=table_resource).execute()
@@ -756,18 +759,20 @@ class BigQueryBaseCursor(object):
'tableId': view_table}}
# check to see if the view we want to add already exists.
if view_access not in access:
- logging.info('granting table %s:%s.%s authorized view access to %s:%s dataset.',
- view_project, view_dataset, view_table,
- source_project, source_dataset)
+ self.logger.info(
+ 'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
+ view_project, view_dataset, view_table, source_project, source_dataset
+ )
access.append(view_access)
return self.service.datasets().patch(projectId=source_project,
datasetId=source_dataset,
body={'access': access}).execute()
else:
# if view is already in access, do nothing.
- logging.info('table %s:%s.%s already has authorized view access to %s:%s dataset.',
- view_project, view_dataset, view_table,
- source_project, source_dataset)
+ self.logger.info(
+ 'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
+ view_project, view_dataset, view_table, source_project, source_dataset
+ )
return source_dataset_resource
@@ -1027,10 +1032,12 @@ def _split_tablename(table_input, default_project_id, var_name=None):
if project_id is None:
if var_name is not None:
- logging.info(
- 'project not included in {var}: '
- '{input}; using project "{project}"'.format(
- var=var_name, input=table_input, project=default_project_id))
+ log = LoggingMixin().logger
+ log.info(
+ 'Project not included in {var}: {input}; using project "{project}"'.format(
+ var=var_name, input=table_input, project=default_project_id
+ )
+ )
project_id = default_project_id
return project_id, dataset_id, table_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/cloudant_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index 6cea26f..d9db08d 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -15,10 +15,10 @@
from past.builtins import unicode
import cloudant
-import logging
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
class CloudantHook(BaseHook):
@@ -35,9 +35,11 @@ class CloudantHook(BaseHook):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
- logging.debug(('cloudant-python does not support unicode. '
- 'Encoding %s as ascii using "ignore".'),
- s)
+ log = LoggingMixin().logger
+ log.debug(
+ 'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
+ s
+ )
return s.encode('ascii', 'ignore')
return s
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/databricks_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py
index 18e20c4..7b20433 100644
--- a/airflow/contrib/hooks/databricks_hook.py
+++ b/airflow/contrib/hooks/databricks_hook.py
@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
import requests
from airflow import __version__
@@ -22,6 +20,7 @@ from airflow.hooks.base_hook import BaseHook
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase
+from airflow.utils.log.LoggingMixin import LoggingMixin
try:
from urllib import parse as urlparse
@@ -35,7 +34,7 @@ CANCEL_RUN_ENDPOINT = ('POST', 'api/2.0/jobs/runs/cancel')
USER_AGENT_HEADER = {'user-agent': 'airflow-{v}'.format(v=__version__)}
-class DatabricksHook(BaseHook):
+class DatabricksHook(BaseHook, LoggingMixin):
"""
Interact with Databricks.
"""
@@ -101,10 +100,10 @@ class DatabricksHook(BaseHook):
host=self._parse_host(self.databricks_conn.host),
endpoint=endpoint)
if 'token' in self.databricks_conn.extra_dejson:
- logging.info('Using token auth.')
+ self.logger.info('Using token auth.')
auth = _TokenAuth(self.databricks_conn.extra_dejson['token'])
else:
- logging.info('Using basic auth.')
+ self.logger.info('Using basic auth.')
auth = (self.databricks_conn.login, self.databricks_conn.password)
if method == 'GET':
request_func = requests.get
@@ -130,8 +129,10 @@ class DatabricksHook(BaseHook):
response.content, response.status_code))
except (requests_exceptions.ConnectionError,
requests_exceptions.Timeout) as e:
- logging.error(('Attempt {0} API Request to Databricks failed ' +
- 'with reason: {1}').format(attempt_num, e))
+ self.logger.error(
+ 'Attempt %s API Request to Databricks failed with reason: %s',
+ attempt_num, e
+ )
raise AirflowException(('API requests to Databricks failed {} times. ' +
'Giving up.').format(self.retry_limit))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
index 2125701..0f5af00 100644
--- a/airflow/contrib/hooks/datadog_hook.py
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -13,14 +13,14 @@
# limitations under the License.
import time
-import logging
-
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from datadog import initialize, api
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
-class DatadogHook(BaseHook):
+class DatadogHook(BaseHook, LoggingMixin):
"""
Uses datadog API to send metrics of practically anything measurable,
so it's possible to track # of db records inserted/deleted, records read
@@ -32,7 +32,6 @@ class DatadogHook(BaseHook):
:param datadog_conn_id: The connection to datadog, containing metadata for api keys.
:param datadog_conn_id: string
"""
-
def __init__(self, datadog_conn_id='datadog_default'):
conn = self.get_connection(datadog_conn_id)
self.api_key = conn.extra_dejson.get('api_key', None)
@@ -48,7 +47,7 @@ class DatadogHook(BaseHook):
if self.app_key is None:
raise AirflowException("app_key must be specified in the Datadog connection details")
- logging.info("Setting up api keys for datadog")
+ self.logger.info("Setting up api keys for Datadog")
options = {
'api_key': self.api_key,
'app_key': self.app_key
@@ -57,8 +56,8 @@ class DatadogHook(BaseHook):
def validate_response(self, response):
if response['status'] != 'ok':
- logging.error("Data dog returned: " + response)
- raise AirflowException("Error status received from datadog")
+ self.logger.error("Datadog returned: %s", response)
+ raise AirflowException("Error status received from Datadog")
def send_metric(self, metric_name, datapoint, tags=None):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/datastore_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py
index 7a4386a..2ff1600 100644
--- a/airflow/contrib/hooks/datastore_hook.py
+++ b/airflow/contrib/hooks/datastore_hook.py
@@ -15,7 +15,6 @@
import json
import time
-import logging
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
@@ -137,7 +136,7 @@ class DatastoreHook(GoogleCloudBaseHook):
result = self.get_operation(name)
state = result['metadata']['common']['state']
if state == 'PROCESSING':
- logging.info('Operation is processing. Re-polling state in {} seconds'
+ self.logger.info('Operation is processing. Re-polling state in {} seconds'
.format(polling_interval_in_seconds))
time.sleep(polling_interval_in_seconds)
else:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py
index 148811f..a6b3181 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -15,11 +15,12 @@
import datetime
import ftplib
-import logging
import os.path
from airflow.hooks.base_hook import BaseHook
from past.builtins import basestring
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
def mlsd(conn, path="", facts=None):
"""
@@ -54,7 +55,7 @@ def mlsd(conn, path="", facts=None):
yield (name, entry)
-class FTPHook(BaseHook):
+class FTPHook(BaseHook, LoggingMixin):
"""
Interact with FTP.
@@ -166,10 +167,9 @@ class FTPHook(BaseHook):
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
- logging.info('Retrieving file from FTP: {}'.format(remote_full_path))
+ self.logger.info('Retrieving file from FTP: %s', remote_full_path)
conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
- logging.info('Finished retrieving file from FTP: {}'.format(
- remote_full_path))
+ self.logger.info('Finished retrieving file from FTP: %s', remote_full_path)
if is_path:
output_handle.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_api_base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 48c5979..7476c90 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -12,18 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
-import json
-
import httplib2
from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class GoogleCloudBaseHook(BaseHook):
+
+class GoogleCloudBaseHook(BaseHook, LoggingMixin):
"""
A base hook for Google cloud-related hooks. Google cloud has a shared REST
API client that is built in the same way no matter which service you use.
@@ -43,7 +41,6 @@ class GoogleCloudBaseHook(BaseHook):
Legacy P12 key files are not supported.
"""
-
def __init__(self, conn_id, delegate_to=None):
"""
:param conn_id: The connection ID to use when fetching connection info.
@@ -69,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook):
kwargs['sub'] = self.delegate_to
if not key_path:
- logging.info('Getting connection using `gcloud auth` user, since no key file '
+ self.logger.info('Getting connection using `gcloud auth` user, since no key file '
'is defined for hook.')
credentials = GoogleCredentials.get_application_default()
else:
@@ -77,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook):
raise AirflowException('Scope should be defined when using a key file.')
scopes = [s.strip() for s in scope.split(',')]
if key_path.endswith('.json'):
- logging.info('Getting connection using a JSON key file.')
+ self.logger.info('Getting connection using a JSON key file.')
credentials = ServiceAccountCredentials\
.from_json_keyfile_name(key_path, scopes)
elif key_path.endswith('.p12'):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index fc73288..66dfb07 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
import select
import subprocess
import time
@@ -21,10 +19,10 @@ import uuid
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class _DataflowJob(object):
-
+class _DataflowJob(LoggingMixin):
def __init__(self, dataflow, project_number, name):
self._dataflow = dataflow
self._project_number = project_number
@@ -49,11 +47,15 @@ class _DataflowJob(object):
job = self._dataflow.projects().jobs().get(projectId=self._project_number,
jobId=self._job_id).execute()
if 'currentState' in job:
- logging.info('Google Cloud DataFlow job %s is %s', job['name'],
- job['currentState'])
+ self.logger.info(
+ 'Google Cloud DataFlow job %s is %s',
+ job['name'], job['currentState']
+ )
else:
- logging.info('Google Cloud DataFlow with job_id %s has name %s', self._job_id,
- job['name'])
+ self.logger.info(
+ 'Google Cloud DataFlow with job_id %s has name %s',
+ self._job_id, job['name']
+ )
return job
def wait_for_done(self):
@@ -70,7 +72,7 @@ class _DataflowJob(object):
elif 'JOB_STATE_RUNNING' == self._job['currentState']:
time.sleep(10)
else:
- logging.debug(str(self._job))
+ self.logger.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
@@ -83,8 +85,7 @@ class _DataflowJob(object):
return self._job
-class _Dataflow(object):
-
+class _Dataflow(LoggingMixin):
def __init__(self, cmd):
self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -105,15 +106,15 @@ class _Dataflow(object):
def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
- logging.info("Start waiting for DataFlow process to complete.")
+ self.logger.info("Start waiting for DataFlow process to complete.")
while self._proc.poll() is None:
ret = select.select(reads, [], [], 5)
if ret is not None:
for fd in ret[0]:
line = self._line(fd)
- logging.debug(line[:-1])
+ self.logger.debug(line[:-1])
else:
- logging.info("Waiting for DataFlow process to complete.")
+ self.logger.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index c1d8993..3a1336e 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import logging
import time
import uuid
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class _DataProcJob:
+class _DataProcJob(LoggingMixin):
def __init__(self, dataproc_api, project_id, job):
self.dataproc_api = dataproc_api
self.project_id = project_id
@@ -30,8 +30,10 @@ class _DataProcJob:
region='global',
body=job).execute()
self.job_id = self.job['reference']['jobId']
- logging.info('DataProc job %s is %s', self.job_id,
- str(self.job['status']['state']))
+ self.logger.info(
+ 'DataProc job %s is %s',
+ self.job_id, str(self.job['status']['state'])
+ )
def wait_for_done(self):
while True:
@@ -41,21 +43,23 @@ class _DataProcJob:
jobId=self.job_id).execute()
if 'ERROR' == self.job['status']['state']:
print(str(self.job))
- logging.error('DataProc job %s has errors', self.job_id)
- logging.error(self.job['status']['details'])
- logging.debug(str(self.job))
+ self.logger.error('DataProc job %s has errors', self.job_id)
+ self.logger.error(self.job['status']['details'])
+ self.logger.debug(str(self.job))
return False
if 'CANCELLED' == self.job['status']['state']:
print(str(self.job))
- logging.warning('DataProc job %s is cancelled', self.job_id)
+ self.logger.warning('DataProc job %s is cancelled', self.job_id)
if 'details' in self.job['status']:
- logging.warning(self.job['status']['details'])
- logging.debug(str(self.job))
+ self.logger.warning(self.job['status']['details'])
+ self.logger.debug(str(self.job))
return False
if 'DONE' == self.job['status']['state']:
return True
- logging.debug('DataProc job %s is %s', self.job_id,
- str(self.job['status']['state']))
+ self.logger.debug(
+ 'DataProc job %s is %s',
+ self.job_id, str(self.job['status']['state'])
+ )
time.sleep(5)
def raise_error(self, message=None):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_mlengine_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 47d9700..35f31a7 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -13,44 +13,40 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
-import logging
import random
import time
-from airflow import settings
-from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from apiclient.discovery import build
from apiclient import errors
+from apiclient.discovery import build
from oauth2client.client import GoogleCredentials
-logging.getLogger('GoogleCloudMLEngine').setLevel(settings.LOGGING_LEVEL)
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
+ log = LoggingMixin().logger
for i in range(0, max_n):
try:
response = request.execute()
if is_error_func(response):
raise ValueError(
- 'The response contained an error: {}'.format(response))
+ 'The response contained an error: {}'.format(response)
+ )
elif is_done_func(response):
- logging.info('Operation is done: {}'.format(response))
+ log.info('Operation is done: %s', response)
return response
else:
time.sleep((2**i) + (random.randint(0, 1000) / 1000))
except errors.HttpError as e:
if e.resp.status != 429:
- logging.info(
- 'Something went wrong. Not retrying: {}'.format(e))
+ log.info('Something went wrong. Not retrying: %s', format(e))
raise
else:
time.sleep((2**i) + (random.randint(0, 1000) / 1000))
class MLEngineHook(GoogleCloudBaseHook):
-
def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
super(MLEngineHook, self).__init__(gcp_conn_id, delegate_to)
self._mlengine = self.get_conn()
@@ -107,17 +103,20 @@ class MLEngineHook(GoogleCloudBaseHook):
if use_existing_job_fn is not None:
existing_job = self._get_job(project_id, job_id)
if not use_existing_job_fn(existing_job):
- logging.error(
- 'Job with job_id {} already exist, but it does '
- 'not match our expectation: {}'.format(
- job_id, existing_job))
+ self.logger.error(
+ 'Job with job_id %s already exist, but it does '
+ 'not match our expectation: %s',
+ job_id, existing_job
+ )
raise
- logging.info(
- 'Job with job_id {} already exist. Will waiting for it to '
- 'finish'.format(job_id))
+ self.logger.info(
+ 'Job with job_id %s already exist. Will waiting for it to finish',
+ job_id
+ )
else:
- logging.error('Failed to create MLEngine job: {}'.format(e))
+ self.logger.error('Failed to create MLEngine job: {}'.format(e))
raise
+
return self._wait_for_job_done(project_id, job_id)
def _get_job(self, project_id, job_id):
@@ -140,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook):
# polling after 30 seconds when quota failure occurs
time.sleep(30)
else:
- logging.error('Failed to get MLEngine job: {}'.format(e))
+ self.logger.error('Failed to get MLEngine job: {}'.format(e))
raise
def _wait_for_job_done(self, project_id, job_id, interval=30):
@@ -192,11 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook):
try:
response = request.execute()
- logging.info(
- 'Successfully set version: {} to default'.format(response))
+ self.logger.info('Successfully set version: %s to default', response)
return response
except errors.HttpError as e:
- logging.error('Something went wrong: {}'.format(e))
+ self.logger.error('Something went wrong: %s', e)
raise
def list_versions(self, project_id, model_name):
@@ -264,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook):
return request.execute()
except errors.HttpError as e:
if e.resp.status == 404:
- logging.error('Model was not found: {}'.format(e))
+ self.logger.error('Model was not found: %s', e)
return None
raise
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index b5f3edc..eb17c3b 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -12,17 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
-
from apiclient.discovery import build
from apiclient.http import MediaFileUpload
from googleapiclient import errors
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-logging.getLogger("google_cloud_storage").setLevel(logging.INFO)
-
class GoogleCloudStorageHook(GoogleCloudBaseHook):
"""
@@ -187,8 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
updated = dateutil.parser.parse(response['updated'])
- logging.log(logging.INFO, "Verify object date: " + str(updated)
- + " > " + str(ts))
+ self.logger.info("Verify object date: %s > %s", updated, ts)
if updated > ts:
return True
@@ -253,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
).execute()
if 'items' not in response:
- logging.info("No items found for prefix:{}".format(prefix))
+ self.logger.info("No items found for prefix: %s", prefix)
break
for item in response['items']:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
index 148101b..8702608 100644
--- a/airflow/contrib/hooks/jira_hook.py
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -11,24 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from jira import JIRA
from jira.exceptions import JIRAError
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class JiraHook(BaseHook):
+class JiraHook(BaseHook, LoggingMixin):
"""
Jira interaction hook, a Wrapper around JIRA Python SDK.
:param jira_conn_id: reference to a pre-defined Jira Connection
:type jira_conn_id: string
"""
-
def __init__(self,
jira_conn_id='jira_default'):
super(JiraHook, self).__init__(jira_conn_id)
@@ -38,7 +35,7 @@ class JiraHook(BaseHook):
def get_conn(self):
if not self.client:
- logging.debug('creating jira client for conn_id: {0}'.format(self.jira_conn_id))
+ self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
get_server_info = True
validate = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index c51a757..1a5e7ec 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -16,12 +16,12 @@
import os
import time
import datetime
-import logging
import six
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
+from airflow.utils.log.LoggingMixin import LoggingMixin
from airflow.utils.state import State
from qds_sdk.qubole import Qubole
@@ -68,7 +68,7 @@ COMMAND_ARGS = {
}
-class QuboleHook(BaseHook):
+class QuboleHook(BaseHook, LoggingMixin):
def __init__(self, *args, **kwargs):
conn = self.get_connection(kwargs['qubole_conn_id'])
Qubole.configure(api_token=conn.password, api_url=conn.host)
@@ -84,31 +84,33 @@ class QuboleHook(BaseHook):
cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id)
if cmd_id is not None:
- logger = logging.getLogger("QuboleHook")
cmd = Command.find(cmd_id)
if cmd is not None:
+ log = LoggingMixin().logger
if cmd.status == 'done':
- logger.info('Command ID: %s has been succeeded, hence marking this '
+ log.info('Command ID: %s has been succeeded, hence marking this '
'TI as Success.', cmd_id)
ti.state = State.SUCCESS
elif cmd.status == 'running':
- logger.info('Cancelling the Qubole Command Id: %s', cmd_id)
+ log.info('Cancelling the Qubole Command Id: %s', cmd_id)
cmd.cancel()
def execute(self, context):
args = self.cls.parse(self.create_cmd_args(context))
self.cmd = self.cls.create(**args)
context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
- logging.info("Qubole command created with Id: %s and Status: %s",
- self.cmd.id, self.cmd.status)
+ self.logger.info(
+ "Qubole command created with Id: %s and Status: %s",
+ self.cmd.id, self.cmd.status
+ )
while not Command.is_done(self.cmd.status):
time.sleep(Qubole.poll_interval)
self.cmd = self.cls.find(self.cmd.id)
- logging.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
+ self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
- logging.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
+ self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
if self.cmd.status != 'done':
raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
@@ -124,7 +126,7 @@ class QuboleHook(BaseHook):
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
self.cmd = self.cls.find(cmd_id)
if self.cls and self.cmd:
- logging.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
+ self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
self.cmd.cancel()
def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
index 936eff8..a8999d6 100644
--- a/airflow/contrib/hooks/redis_hook.py
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -15,16 +15,14 @@
"""
RedisHook module
"""
-
-import logging
-
from redis import StrictRedis
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class RedisHook(BaseHook):
+class RedisHook(BaseHook, LoggingMixin):
"""
Hook to interact with Redis database
"""
@@ -42,7 +40,7 @@ class RedisHook(BaseHook):
self.port = int(conn.port)
self.password = conn.password
self.db = int(conn.extra_dejson.get('db', 0))
- self.logger = logging.getLogger(__name__)
+
self.logger.debug(
'''Connection "{conn}":
\thost: {host}
@@ -62,11 +60,9 @@ class RedisHook(BaseHook):
"""
if not self.client:
self.logger.debug(
- 'generating Redis client for conn_id "{conn}" on '
- '{host}:{port}:{db}'.format(conn=self.redis_conn_id,
- host=self.host,
- port=self.port,
- db=self.db))
+ 'generating Redis client for conn_id "%s" on %s:%s:%s',
+ self.redis_conn_id, self.host, self.port, self.db
+ )
try:
self.client = StrictRedis(
host=self.host,