You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/19 08:17:33 UTC
[4/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to
log
[AIRFLOW-1604] Rename logger to log
In all the popular languages the variable name log
is the de facto
standard for the logging. Rename LoggingMixin.py
to logging_mixin.py
to comply with the Python standard.
When using the .logger a deprecation warning will
be emitted.
Closes #2604 from Fokko/AIRFLOW-1604-logger-to-log
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb2f5890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb2f5890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb2f5890
Branch: refs/heads/master
Commit: eb2f589099b87743482c2eb16261b49e284dcd96
Parents: 8e253c7
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Tue Sep 19 10:17:14 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Sep 19 10:17:14 2017 +0200
----------------------------------------------------------------------
airflow/__init__.py | 4 +-
airflow/api/__init__.py | 4 +-
airflow/api/auth/backend/kerberos_auth.py | 4 +-
airflow/bin/cli.py | 10 +-
airflow/configuration.py | 4 +-
.../auth/backends/github_enterprise_auth.py | 4 +-
airflow/contrib/auth/backends/google_auth.py | 4 +-
airflow/contrib/auth/backends/kerberos_auth.py | 2 +-
airflow/contrib/auth/backends/ldap_auth.py | 4 +-
airflow/contrib/auth/backends/password_auth.py | 4 +-
airflow/contrib/executors/mesos_executor.py | 34 +--
airflow/contrib/hooks/bigquery_hook.py | 24 +--
airflow/contrib/hooks/cloudant_hook.py | 4 +-
airflow/contrib/hooks/databricks_hook.py | 8 +-
airflow/contrib/hooks/datadog_hook.py | 6 +-
airflow/contrib/hooks/datastore_hook.py | 4 +-
airflow/contrib/hooks/ftp_hook.py | 6 +-
airflow/contrib/hooks/gcp_api_base_hook.py | 6 +-
airflow/contrib/hooks/gcp_dataflow_hook.py | 14 +-
airflow/contrib/hooks/gcp_dataproc_hook.py | 18 +-
airflow/contrib/hooks/gcp_mlengine_hook.py | 18 +-
airflow/contrib/hooks/gcs_hook.py | 4 +-
airflow/contrib/hooks/jira_hook.py | 4 +-
airflow/contrib/hooks/qubole_hook.py | 12 +-
airflow/contrib/hooks/redis_hook.py | 6 +-
airflow/contrib/hooks/salesforce_hook.py | 14 +-
airflow/contrib/hooks/spark_sql_hook.py | 6 +-
airflow/contrib/hooks/spark_submit_hook.py | 14 +-
airflow/contrib/hooks/sqoop_hook.py | 10 +-
airflow/contrib/hooks/ssh_hook.py | 14 +-
airflow/contrib/operators/bigquery_operator.py | 2 +-
.../operators/bigquery_table_delete_operator.py | 2 +-
.../contrib/operators/bigquery_to_bigquery.py | 2 +-
airflow/contrib/operators/bigquery_to_gcs.py | 6 +-
.../contrib/operators/databricks_operator.py | 12 +-
airflow/contrib/operators/dataproc_operator.py | 16 +-
.../operators/datastore_export_operator.py | 2 +-
.../operators/datastore_import_operator.py | 2 +-
airflow/contrib/operators/ecs_operator.py | 12 +-
.../contrib/operators/emr_add_steps_operator.py | 4 +-
.../operators/emr_create_job_flow_operator.py | 4 +-
.../emr_terminate_job_flow_operator.py | 4 +-
airflow/contrib/operators/file_to_wasb.py | 2 +-
airflow/contrib/operators/fs_operator.py | 2 +-
.../contrib/operators/gcs_download_operator.py | 4 +-
airflow/contrib/operators/gcs_to_bq.py | 2 +-
airflow/contrib/operators/hipchat_operator.py | 4 +-
airflow/contrib/operators/mlengine_operator.py | 14 +-
airflow/contrib/operators/mysql_to_gcs.py | 2 +-
airflow/contrib/operators/sftp_operator.py | 4 +-
airflow/contrib/operators/vertica_operator.py | 2 +-
airflow/contrib/operators/vertica_to_hive.py | 4 +-
airflow/contrib/sensors/bigquery_sensor.py | 2 +-
airflow/contrib/sensors/datadog_sensor.py | 2 +-
airflow/contrib/sensors/emr_base_sensor.py | 4 +-
airflow/contrib/sensors/emr_job_flow_sensor.py | 2 +-
airflow/contrib/sensors/emr_step_sensor.py | 2 +-
airflow/contrib/sensors/ftp_sensor.py | 2 +-
airflow/contrib/sensors/gcs_sensor.py | 4 +-
airflow/contrib/sensors/hdfs_sensors.py | 6 +-
airflow/contrib/sensors/jira_sensor.py | 14 +-
airflow/contrib/sensors/redis_key_sensor.py | 2 +-
airflow/contrib/sensors/wasb_sensor.py | 4 +-
.../contrib/task_runner/cgroup_task_runner.py | 20 +-
airflow/executors/__init__.py | 4 +-
airflow/executors/base_executor.py | 14 +-
airflow/executors/celery_executor.py | 16 +-
airflow/executors/dask_executor.py | 4 +-
airflow/executors/local_executor.py | 6 +-
airflow/executors/sequential_executor.py | 4 +-
airflow/hooks/S3_hook.py | 10 +-
airflow/hooks/base_hook.py | 4 +-
airflow/hooks/dbapi_hook.py | 6 +-
airflow/hooks/druid_hook.py | 4 +-
airflow/hooks/hive_hooks.py | 26 +--
airflow/hooks/http_hook.py | 6 +-
airflow/hooks/oracle_hook.py | 8 +-
airflow/hooks/pig_hook.py | 4 +-
airflow/hooks/webhdfs_hook.py | 12 +-
airflow/hooks/zendesk_hook.py | 4 +-
airflow/jobs.py | 206 +++++++++----------
airflow/models.py | 126 ++++++------
airflow/operators/bash_operator.py | 14 +-
airflow/operators/check_operator.py | 20 +-
airflow/operators/dagrun_operator.py | 4 +-
airflow/operators/docker_operator.py | 10 +-
airflow/operators/generic_transfer.py | 10 +-
airflow/operators/hive_operator.py | 2 +-
airflow/operators/hive_stats_operator.py | 8 +-
airflow/operators/hive_to_druid.py | 10 +-
airflow/operators/hive_to_mysql.py | 10 +-
airflow/operators/hive_to_samba_operator.py | 4 +-
airflow/operators/http_operator.py | 2 +-
airflow/operators/jdbc_operator.py | 2 +-
airflow/operators/latest_only_operator.py | 12 +-
airflow/operators/mssql_operator.py | 2 +-
airflow/operators/mssql_to_hive.py | 4 +-
airflow/operators/mysql_operator.py | 2 +-
airflow/operators/mysql_to_hive.py | 4 +-
airflow/operators/oracle_operator.py | 2 +-
airflow/operators/pig_operator.py | 2 +-
airflow/operators/postgres_operator.py | 2 +-
airflow/operators/presto_to_mysql.py | 8 +-
airflow/operators/python_operator.py | 36 ++--
airflow/operators/redshift_to_s3_operator.py | 6 +-
airflow/operators/s3_file_transform_operator.py | 12 +-
airflow/operators/s3_to_hive_operator.py | 24 +--
airflow/operators/sensors.py | 34 +--
airflow/operators/slack_operator.py | 2 +-
airflow/operators/sqlite_operator.py | 2 +-
airflow/plugins_manager.py | 4 +-
airflow/security/kerberos.py | 2 +-
airflow/settings.py | 4 +-
airflow/task_runner/base_task_runner.py | 10 +-
airflow/task_runner/bash_task_runner.py | 2 +-
airflow/utils/dag_processing.py | 18 +-
airflow/utils/db.py | 4 +-
airflow/utils/email.py | 4 +-
airflow/utils/log/LoggingMixin.py | 45 ----
airflow/utils/log/gcs_task_handler.py | 8 +-
airflow/utils/log/logging_mixin.py | 61 ++++++
airflow/utils/log/s3_task_handler.py | 8 +-
airflow/utils/timeout.py | 12 +-
airflow/www/api/experimental/endpoints.py | 4 +-
airflow/www/app.py | 2 +-
scripts/perf/scheduler_ops_metrics.py | 4 +-
tests/contrib/hooks/test_databricks_hook.py | 2 +-
.../contrib/operators/test_dataproc_operator.py | 8 +-
tests/contrib/sensors/test_hdfs_sensors.py | 62 +++---
tests/executors/test_executor.py | 4 +-
tests/operators/sensors.py | 4 +-
tests/test_utils/reset_warning_registry.py | 82 ++++++++
tests/utils/log/test_logging.py | 6 +-
tests/utils/test_logging_mixin.py | 50 +++++
134 files changed, 858 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 8844eeb..3c5f24c 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -21,7 +21,7 @@ in their PYTHONPATH. airflow_login should be based off the
"""
from builtins import object
from airflow import version
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
__version__ = version.version
@@ -41,7 +41,7 @@ login = None
def load_login():
- log = LoggingMixin().logger
+ log = LoggingMixin().log
auth_backend = 'airflow.default_login'
try:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 39edbed..31a303b 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -17,11 +17,11 @@ from airflow.exceptions import AirflowException
from airflow import configuration as conf
from importlib import import_module
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
api_auth = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
def load_auth():
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/auth/backend/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index 73a5aa2..a904d59 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -24,7 +24,7 @@
from future.standard_library import install_aliases
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
install_aliases()
@@ -47,7 +47,7 @@ client_auth = HTTPKerberosAuth(service='airflow')
_SERVICE_NAME = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
def init_app(app):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 56f1855..5035a66 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,7 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.www.app import cached_app
from sqlalchemy import func
@@ -64,7 +64,7 @@ api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.api_auth.client_auth)
-log = LoggingMixin().logger
+log = LoggingMixin().log
def sigint_handler(sig, frame):
@@ -189,7 +189,7 @@ def trigger_dag(args):
:param args:
:return:
"""
- log = LoggingMixin().logger
+ log = LoggingMixin().log
try:
message = api_client.trigger_dag(dag_id=args.dag_id,
run_id=args.run_id,
@@ -202,7 +202,7 @@ def trigger_dag(args):
def pool(args):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
def _tabulate(pools):
return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
@@ -330,7 +330,7 @@ def run(args, dag=None):
if dag:
args.dag_id = dag.dag_id
- log = LoggingMixin().logger
+ log = LoggingMixin().log
# Load custom airflow config
if args.cfg_path:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index db196f9..ff81d98 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,7 +28,7 @@ import sys
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
@@ -38,7 +38,7 @@ from six.moves import configparser
from airflow.exceptions import AirflowConfigException
-log = LoggingMixin().logger
+log = LoggingMixin().log
# show Airflow's deprecation warnings
warnings.filterwarnings(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 459e9c9..28c3cfc 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -27,9 +27,9 @@ from flask_oauthlib.client import OAuth
from airflow import models, configuration, settings
from airflow.configuration import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def get_config_param(param):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index f38f725..e6eab94 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -26,9 +26,9 @@ from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
from airflow import models, configuration, settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def get_config_param(param):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index ffb711f..908ebc9 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -29,7 +29,7 @@ from flask import url_for, redirect
from airflow import settings
from airflow import models
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 8ce0875..b056851 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -33,13 +33,13 @@ from airflow.configuration import AirflowConfigException
import traceback
import re
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
class AuthenticationError(Exception):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/password_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index 3ad2a8b..8adb1f4 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -32,13 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
from airflow import settings
from airflow import models
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
PY3 = version_info[0] == 3
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index 19d72ed..8728566 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -14,7 +14,7 @@
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.www.utils import LoginMixin
standard_library.install_aliases()
@@ -65,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
self.task_key_map = {}
def registered(self, driver, frameworkId, masterInfo):
- self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
+ self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'):
# Import here to work around a circular import error
@@ -86,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
Session.remove()
def reregistered(self, driver, masterInfo):
- self.logger.info("AirflowScheduler re-registered to mesos")
+ self.log.info("AirflowScheduler re-registered to mesos")
def disconnected(self, driver):
- self.logger.info("AirflowScheduler disconnected from mesos")
+ self.log.info("AirflowScheduler disconnected from mesos")
def offerRescinded(self, driver, offerId):
- self.logger.info("AirflowScheduler offer %s rescinded", str(offerId))
+ self.log.info("AirflowScheduler offer %s rescinded", str(offerId))
def frameworkMessage(self, driver, executorId, slaveId, message):
- self.logger.info("AirflowScheduler received framework message %s", message)
+ self.log.info("AirflowScheduler received framework message %s", message)
def executorLost(self, driver, executorId, slaveId, status):
- self.logger.warning("AirflowScheduler executor %s lost", str(executorId))
+ self.log.warning("AirflowScheduler executor %s lost", str(executorId))
def slaveLost(self, driver, slaveId):
- self.logger.warning("AirflowScheduler slave %s lost", str(slaveId))
+ self.log.warning("AirflowScheduler slave %s lost", str(slaveId))
def error(self, driver, message):
- self.logger.error("AirflowScheduler driver aborted %s", message)
+ self.log.error("AirflowScheduler driver aborted %s", message)
raise AirflowException("AirflowScheduler driver aborted %s" % message)
def resourceOffers(self, driver, offers):
@@ -118,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
elif resource.name == "mem":
offerMem += resource.scalar.value
- self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
+ self.log.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
remainingCpus = offerCpus
remainingMem = offerMem
@@ -131,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
self.task_counter += 1
self.task_key_map[str(tid)] = key
- self.logger.info("Launching task %d using offer %s", tid, offer.id.value)
+ self.log.info("Launching task %d using offer %s", tid, offer.id.value)
task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
@@ -161,7 +161,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
driver.launchTasks(offer.id, tasks)
def statusUpdate(self, driver, update):
- self.logger.info(
+ self.log.info(
"Task %s is in state %s, data %s",
update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)
)
@@ -171,7 +171,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
except KeyError:
# The map may not contain an item if the framework re-registered after a failover.
# Discard these tasks.
- self.logger.warning("Unrecognised task key %s", update.task_id.value)
+ self.log.warning("Unrecognised task key %s", update.task_id.value)
return
if update.state == mesos_pb2.TASK_FINISHED:
@@ -203,7 +203,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
framework.user = ''
if not configuration.get('mesos', 'MASTER'):
- self.logger.error("Expecting mesos master URL for mesos executor")
+ self.log.error("Expecting mesos master URL for mesos executor")
raise AirflowException("mesos.master not provided for mesos executor")
master = configuration.get('mesos', 'MASTER')
@@ -239,7 +239,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
else:
framework.checkpoint = False
- self.logger.info(
+ self.log.info(
'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)
)
@@ -248,10 +248,10 @@ class MesosExecutor(BaseExecutor, LoginMixin):
if configuration.getboolean('mesos', 'AUTHENTICATE'):
if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
- self.logger.error("Expecting authentication principal in the environment")
+ self.log.error("Expecting authentication principal in the environment")
raise AirflowException("mesos.default_principal not provided in authenticated mode")
if not configuration.get('mesos', 'DEFAULT_SECRET'):
- self.logger.error("Expecting authentication secret in the environment")
+ self.log.error("Expecting authentication secret in the environment")
raise AirflowException("mesos.default_secret not provided in authenticated mode")
credential = mesos_pb2.Credential()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 497fa28..5fc7e22 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -32,7 +32,7 @@ from past.builtins import basestring
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
@@ -499,7 +499,7 @@ class BigQueryBaseCursor(LoggingMixin):
"'WRITE_APPEND' or 'WRITE_TRUNCATE'."
)
else:
- self.logger.info(
+ self.log.info(
"Adding experimental "
"'schemaUpdateOptions': {0}".format(schema_update_options)
)
@@ -576,12 +576,12 @@ class BigQueryBaseCursor(LoggingMixin):
)
)
else:
- self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+ self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
time.sleep(5)
except HttpError as err:
if err.resp.status in [500, 503]:
- self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+ self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
time.sleep(5)
else:
raise Exception(
@@ -660,14 +660,14 @@ class BigQueryBaseCursor(LoggingMixin):
datasetId=deletion_dataset,
tableId=deletion_table) \
.execute()
- self.logger.info('Deleted table %s:%s.%s.',
- deletion_project, deletion_dataset, deletion_table)
+ self.log.info('Deleted table %s:%s.%s.',
+ deletion_project, deletion_dataset, deletion_table)
except HttpError:
if not ignore_if_missing:
raise Exception(
'Table deletion failed. Table does not exist.')
else:
- self.logger.info('Table does not exist. Skipping.')
+ self.log.info('Table does not exist. Skipping.')
def run_table_upsert(self, dataset_id, table_resource, project_id=None):
@@ -694,7 +694,7 @@ class BigQueryBaseCursor(LoggingMixin):
for table in tables_list_resp.get('tables', []):
if table['tableReference']['tableId'] == table_id:
# found the table, do update
- self.logger.info(
+ self.log.info(
'Table %s:%s.%s exists, updating.',
project_id, dataset_id, table_id
)
@@ -712,7 +712,7 @@ class BigQueryBaseCursor(LoggingMixin):
# If there is no next page, then the table doesn't exist.
else:
# do insert
- self.logger.info(
+ self.log.info(
'Table %s:%s.%s does not exist. creating.',
project_id, dataset_id, table_id
)
@@ -759,7 +759,7 @@ class BigQueryBaseCursor(LoggingMixin):
'tableId': view_table}}
# check to see if the view we want to add already exists.
if view_access not in access:
- self.logger.info(
+ self.log.info(
'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table, source_project, source_dataset
)
@@ -769,7 +769,7 @@ class BigQueryBaseCursor(LoggingMixin):
body={'access': access}).execute()
else:
# if view is already in access, do nothing.
- self.logger.info(
+ self.log.info(
'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table, source_project, source_dataset
)
@@ -1032,7 +1032,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):
if project_id is None:
if var_name is not None:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.info(
'Project not included in {var}: {input}; using project "{project}"'.format(
var=var_name, input=table_input, project=default_project_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/cloudant_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index d9db08d..cbb0cca 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -18,7 +18,7 @@ import cloudant
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class CloudantHook(BaseHook):
@@ -35,7 +35,7 @@ class CloudantHook(BaseHook):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.debug(
'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
s
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/databricks_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py
index 7b20433..cd9dc54 100644
--- a/airflow/contrib/hooks/databricks_hook.py
+++ b/airflow/contrib/hooks/databricks_hook.py
@@ -20,7 +20,7 @@ from airflow.hooks.base_hook import BaseHook
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
try:
from urllib import parse as urlparse
@@ -100,10 +100,10 @@ class DatabricksHook(BaseHook, LoggingMixin):
host=self._parse_host(self.databricks_conn.host),
endpoint=endpoint)
if 'token' in self.databricks_conn.extra_dejson:
- self.logger.info('Using token auth.')
+ self.log.info('Using token auth.')
auth = _TokenAuth(self.databricks_conn.extra_dejson['token'])
else:
- self.logger.info('Using basic auth.')
+ self.log.info('Using basic auth.')
auth = (self.databricks_conn.login, self.databricks_conn.password)
if method == 'GET':
request_func = requests.get
@@ -129,7 +129,7 @@ class DatabricksHook(BaseHook, LoggingMixin):
response.content, response.status_code))
except (requests_exceptions.ConnectionError,
requests_exceptions.Timeout) as e:
- self.logger.error(
+ self.log.error(
'Attempt %s API Request to Databricks failed with reason: %s',
attempt_num, e
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
index 0f5af00..6caf611 100644
--- a/airflow/contrib/hooks/datadog_hook.py
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -17,7 +17,7 @@ from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from datadog import initialize, api
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class DatadogHook(BaseHook, LoggingMixin):
@@ -47,7 +47,7 @@ class DatadogHook(BaseHook, LoggingMixin):
if self.app_key is None:
raise AirflowException("app_key must be specified in the Datadog connection details")
- self.logger.info("Setting up api keys for Datadog")
+ self.log.info("Setting up api keys for Datadog")
options = {
'api_key': self.api_key,
'app_key': self.app_key
@@ -56,7 +56,7 @@ class DatadogHook(BaseHook, LoggingMixin):
def validate_response(self, response):
if response['status'] != 'ok':
- self.logger.error("Datadog returned: %s", response)
+ self.log.error("Datadog returned: %s", response)
raise AirflowException("Error status received from Datadog")
def send_metric(self, metric_name, datapoint, tags=None):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datastore_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py
index 2ff1600..cf98dc7 100644
--- a/airflow/contrib/hooks/datastore_hook.py
+++ b/airflow/contrib/hooks/datastore_hook.py
@@ -136,8 +136,8 @@ class DatastoreHook(GoogleCloudBaseHook):
result = self.get_operation(name)
state = result['metadata']['common']['state']
if state == 'PROCESSING':
- self.logger.info('Operation is processing. Re-polling state in {} seconds'
- .format(polling_interval_in_seconds))
+ self.log.info('Operation is processing. Re-polling state in {} seconds'
+ .format(polling_interval_in_seconds))
time.sleep(polling_interval_in_seconds)
else:
return result
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py
index a6b3181..b1e224d 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -19,7 +19,7 @@ import os.path
from airflow.hooks.base_hook import BaseHook
from past.builtins import basestring
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
def mlsd(conn, path="", facts=None):
@@ -167,9 +167,9 @@ class FTPHook(BaseHook, LoggingMixin):
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
- self.logger.info('Retrieving file from FTP: %s', remote_full_path)
+ self.log.info('Retrieving file from FTP: %s', remote_full_path)
conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
- self.logger.info('Finished retrieving file from FTP: %s', remote_full_path)
+ self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
if is_path:
output_handle.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_api_base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 7476c90..28721d3 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -18,7 +18,7 @@ from oauth2client.service_account import ServiceAccountCredentials
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class GoogleCloudBaseHook(BaseHook, LoggingMixin):
@@ -66,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
kwargs['sub'] = self.delegate_to
if not key_path:
- self.logger.info('Getting connection using `gcloud auth` user, since no key file '
+ self.log.info('Getting connection using `gcloud auth` user, since no key file '
'is defined for hook.')
credentials = GoogleCredentials.get_application_default()
else:
@@ -74,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
raise AirflowException('Scope should be defined when using a key file.')
scopes = [s.strip() for s in scope.split(',')]
if key_path.endswith('.json'):
- self.logger.info('Getting connection using a JSON key file.')
+ self.log.info('Getting connection using a JSON key file.')
credentials = ServiceAccountCredentials\
.from_json_keyfile_name(key_path, scopes)
elif key_path.endswith('.p12'):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index f5767bd..b1a1e0e 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -19,7 +19,7 @@ import uuid
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class _DataflowJob(LoggingMixin):
@@ -48,12 +48,12 @@ class _DataflowJob(LoggingMixin):
job = self._dataflow.projects().jobs().get(projectId=self._project_number,
jobId=self._job_id).execute()
if 'currentState' in job:
- self.logger.info(
+ self.log.info(
'Google Cloud DataFlow job %s is %s',
job['name'], job['currentState']
)
else:
- self.logger.info(
+ self.log.info(
'Google Cloud DataFlow with job_id %s has name %s',
self._job_id, job['name']
)
@@ -75,7 +75,7 @@ class _DataflowJob(LoggingMixin):
elif 'JOB_STATE_PENDING' == self._job['currentState']:
time.sleep(15)
else:
- self.logger.debug(str(self._job))
+ self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
@@ -109,15 +109,15 @@ class _Dataflow(LoggingMixin):
def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
- self.logger.info("Start waiting for DataFlow process to complete.")
+ self.log.info("Start waiting for DataFlow process to complete.")
while self._proc.poll() is None:
ret = select.select(reads, [], [], 5)
if ret is not None:
for fd in ret[0]:
line = self._line(fd)
- self.logger.debug(line[:-1])
+ self.log.debug(line[:-1])
else:
- self.logger.info("Waiting for DataFlow process to complete.")
+ self.log.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 3a1336e..c964f4c 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -18,7 +18,7 @@ import uuid
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class _DataProcJob(LoggingMixin):
@@ -30,7 +30,7 @@ class _DataProcJob(LoggingMixin):
region='global',
body=job).execute()
self.job_id = self.job['reference']['jobId']
- self.logger.info(
+ self.log.info(
'DataProc job %s is %s',
self.job_id, str(self.job['status']['state'])
)
@@ -43,20 +43,20 @@ class _DataProcJob(LoggingMixin):
jobId=self.job_id).execute()
if 'ERROR' == self.job['status']['state']:
print(str(self.job))
- self.logger.error('DataProc job %s has errors', self.job_id)
- self.logger.error(self.job['status']['details'])
- self.logger.debug(str(self.job))
+ self.log.error('DataProc job %s has errors', self.job_id)
+ self.log.error(self.job['status']['details'])
+ self.log.debug(str(self.job))
return False
if 'CANCELLED' == self.job['status']['state']:
print(str(self.job))
- self.logger.warning('DataProc job %s is cancelled', self.job_id)
+ self.log.warning('DataProc job %s is cancelled', self.job_id)
if 'details' in self.job['status']:
- self.logger.warning(self.job['status']['details'])
- self.logger.debug(str(self.job))
+ self.log.warning(self.job['status']['details'])
+ self.log.debug(str(self.job))
return False
if 'DONE' == self.job['status']['state']:
return True
- self.logger.debug(
+ self.log.debug(
'DataProc job %s is %s',
self.job_id, str(self.job['status']['state'])
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_mlengine_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 35f31a7..c17b614 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -20,11 +20,11 @@ from apiclient.discovery import build
from oauth2client.client import GoogleCredentials
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
for i in range(0, max_n):
try:
@@ -103,18 +103,18 @@ class MLEngineHook(GoogleCloudBaseHook):
if use_existing_job_fn is not None:
existing_job = self._get_job(project_id, job_id)
if not use_existing_job_fn(existing_job):
- self.logger.error(
+ self.log.error(
'Job with job_id %s already exist, but it does '
'not match our expectation: %s',
job_id, existing_job
)
raise
- self.logger.info(
+ self.log.info(
'Job with job_id %s already exist. Will waiting for it to finish',
job_id
)
else:
- self.logger.error('Failed to create MLEngine job: {}'.format(e))
+ self.log.error('Failed to create MLEngine job: {}'.format(e))
raise
return self._wait_for_job_done(project_id, job_id)
@@ -139,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook):
# polling after 30 seconds when quota failure occurs
time.sleep(30)
else:
- self.logger.error('Failed to get MLEngine job: {}'.format(e))
+ self.log.error('Failed to get MLEngine job: {}'.format(e))
raise
def _wait_for_job_done(self, project_id, job_id, interval=30):
@@ -191,10 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook):
try:
response = request.execute()
- self.logger.info('Successfully set version: %s to default', response)
+ self.log.info('Successfully set version: %s to default', response)
return response
except errors.HttpError as e:
- self.logger.error('Something went wrong: %s', e)
+ self.log.error('Something went wrong: %s', e)
raise
def list_versions(self, project_id, model_name):
@@ -262,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook):
return request.execute()
except errors.HttpError as e:
if e.resp.status == 404:
- self.logger.error('Model was not found: %s', e)
+ self.log.error('Model was not found: %s', e)
return None
raise
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index eb17c3b..24c247e 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -182,7 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
updated = dateutil.parser.parse(response['updated'])
- self.logger.info("Verify object date: %s > %s", updated, ts)
+ self.log.info("Verify object date: %s > %s", updated, ts)
if updated > ts:
return True
@@ -247,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
).execute()
if 'items' not in response:
- self.logger.info("No items found for prefix: %s", prefix)
+ self.log.info("No items found for prefix: %s", prefix)
break
for item in response['items']:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
index 8702608..21e669f 100644
--- a/airflow/contrib/hooks/jira_hook.py
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -16,7 +16,7 @@ from jira.exceptions import JIRAError
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class JiraHook(BaseHook, LoggingMixin):
@@ -35,7 +35,7 @@ class JiraHook(BaseHook, LoggingMixin):
def get_conn(self):
if not self.client:
- self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
+ self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
get_server_info = True
validate = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 1a5e7ec..833c1c7 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -21,7 +21,7 @@ import six
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from qds_sdk.qubole import Qubole
@@ -86,7 +86,7 @@ class QuboleHook(BaseHook, LoggingMixin):
if cmd_id is not None:
cmd = Command.find(cmd_id)
if cmd is not None:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
if cmd.status == 'done':
log.info('Command ID: %s has been succeeded, hence marking this '
'TI as Success.', cmd_id)
@@ -99,7 +99,7 @@ class QuboleHook(BaseHook, LoggingMixin):
args = self.cls.parse(self.create_cmd_args(context))
self.cmd = self.cls.create(**args)
context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
- self.logger.info(
+ self.log.info(
"Qubole command created with Id: %s and Status: %s",
self.cmd.id, self.cmd.status
)
@@ -107,10 +107,10 @@ class QuboleHook(BaseHook, LoggingMixin):
while not Command.is_done(self.cmd.status):
time.sleep(Qubole.poll_interval)
self.cmd = self.cls.find(self.cmd.id)
- self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
+ self.log.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
- self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
+ self.log.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
if self.cmd.status != 'done':
raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
@@ -126,7 +126,7 @@ class QuboleHook(BaseHook, LoggingMixin):
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
self.cmd = self.cls.find(cmd_id)
if self.cls and self.cmd:
- self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
+ self.log.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
self.cmd.cancel()
def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
index a8999d6..278e196 100644
--- a/airflow/contrib/hooks/redis_hook.py
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -19,7 +19,7 @@ from redis import StrictRedis
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class RedisHook(BaseHook, LoggingMixin):
@@ -41,7 +41,7 @@ class RedisHook(BaseHook, LoggingMixin):
self.password = conn.password
self.db = int(conn.extra_dejson.get('db', 0))
- self.logger.debug(
+ self.log.debug(
'''Connection "{conn}":
\thost: {host}
\tport: {port}
@@ -59,7 +59,7 @@ class RedisHook(BaseHook, LoggingMixin):
Returns a Redis connection.
"""
if not self.client:
- self.logger.debug(
+ self.log.debug(
'generating Redis client for conn_id "%s" on %s:%s:%s',
self.redis_conn_id, self.host, self.port, self.db
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index f2b5fef..0d0a104 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -29,7 +29,7 @@ import json
import pandas as pd
import time
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SalesforceHook(BaseHook, LoggingMixin):
@@ -92,10 +92,10 @@ class SalesforceHook(BaseHook, LoggingMixin):
"""
self.sign_in()
- self.logger.info("Querying for all objects")
+ self.log.info("Querying for all objects")
query = self.sf.query_all(query)
- self.logger.info(
+ self.log.info(
"Received results: Total size: %s; Done: %s",
query['totalSize'], query['done']
)
@@ -144,7 +144,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
field_string = self._build_field_list(fields)
query = "SELECT {0} FROM {1}".format(field_string, obj)
- self.logger.info(
+ self.log.info(
"Making query to Salesforce: %s",
query if len(query) < 30 else " ... ".join([query[:15], query[-15:]])
)
@@ -169,7 +169,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
try:
col = pd.to_datetime(col)
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning(
"Could not convert field to timestamps: %s", col.name
)
@@ -265,7 +265,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
# for each returned record
object_name = query_results[0]['attributes']['type']
- self.logger.info("Coercing timestamps for: %s", object_name)
+ self.log.info("Coercing timestamps for: %s", object_name)
schema = self.describe_object(object_name)
@@ -299,7 +299,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
# there are also a ton of newline objects
# that mess up our ability to write to csv
# we remove these newlines so that the output is a valid CSV format
- self.logger.info("Cleaning data and writing to CSV")
+ self.log.info("Cleaning data and writing to CSV")
possible_strings = df.columns[df.dtypes == "object"]
df[possible_strings] = df[possible_strings].apply(
lambda x: x.str.replace("\r\n", "")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index aa16130..6973023 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -16,7 +16,7 @@ import subprocess
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SparkSqlHook(BaseHook, LoggingMixin):
@@ -121,7 +121,7 @@ class SparkSqlHook(BaseHook, LoggingMixin):
connection_cmd += ["--queue", self._yarn_queue]
connection_cmd += cmd
- self.logger.debug("Spark-Sql cmd: %s", connection_cmd)
+ self.log.debug("Spark-Sql cmd: %s", connection_cmd)
return connection_cmd
@@ -151,5 +151,5 @@ class SparkSqlHook(BaseHook, LoggingMixin):
def kill(self):
if self._sp and self._sp.poll() is None:
- self.logger.info("Killing the Spark-Sql job")
+ self.log.info("Killing the Spark-Sql job")
self._sp.kill()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index bdd1efe..7d59cd2 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -18,7 +18,7 @@ import re
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -123,7 +123,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
conn_data['spark_home'] = extra.get('spark-home', None)
conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
except AirflowException:
- self.logger.debug(
+ self.log.debug(
"Could not load connection string %s, defaulting to %s",
self._conn_id, conn_data['master']
)
@@ -192,7 +192,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
if self._application_args:
connection_cmd += self._application_args
- self.logger.debug("Spark-Submit cmd: %s", connection_cmd)
+ self.log.debug("Spark-Submit cmd: %s", connection_cmd)
return connection_cmd
@@ -239,15 +239,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
self._yarn_application_id = match.groups()[0]
# Pass to logging
- self.logger.info(line)
+ self.log.info(line)
def on_kill(self):
if self._sp and self._sp.poll() is None:
- self.logger.info('Sending kill signal to %s', self._connection['spark_binary'])
+ self.log.info('Sending kill signal to %s', self._connection['spark_binary'])
self._sp.kill()
if self._yarn_application_id:
- self.logger.info('Killing application on YARN')
+ self.log.info('Killing application on YARN')
kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split()
yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.logger.info("YARN killed with return code: %s", yarn_kill.wait())
+ self.log.info("YARN killed with return code: %s", yarn_kill.wait())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 0584df4..5b00b15 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -20,7 +20,7 @@ import subprocess
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SqoopHook(BaseHook, LoggingMixin):
@@ -76,7 +76,7 @@ class SqoopHook(BaseHook, LoggingMixin):
password_index = cmd.index('--password')
cmd[password_index + 1] = 'MASKED'
except ValueError:
- self.logger.debug("No password in sqoop cmd")
+ self.log.debug("No password in sqoop cmd")
return cmd
def Popen(self, cmd, **kwargs):
@@ -87,18 +87,18 @@ class SqoopHook(BaseHook, LoggingMixin):
:param kwargs: extra arguments to Popen (see subprocess.Popen)
:return: handle to subprocess
"""
- self.logger.info("Executing command: %s", ' '.join(cmd))
+ self.log.info("Executing command: %s", ' '.join(cmd))
sp = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs)
for line in iter(sp.stdout):
- self.logger.info(line.strip())
+ self.log.info(line.strip())
sp.wait()
- self.logger.info("Command exited with return code %s", sp.returncode)
+ self.log.info("Command exited with return code %s", sp.returncode)
if sp.returncode:
raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index 3fe9146..b061fd7 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -23,7 +23,7 @@ import paramiko
from contextlib import contextmanager
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SSHHook(BaseHook, LoggingMixin):
@@ -70,7 +70,7 @@ class SSHHook(BaseHook, LoggingMixin):
def get_conn(self):
if not self.client:
- self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
+ self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
if self.ssh_conn_id is not None:
conn = self.get_connection(self.ssh_conn_id)
if self.username is None:
@@ -98,7 +98,7 @@ class SSHHook(BaseHook, LoggingMixin):
# Auto detecting username values from system
if not self.username:
- self.logger.debug(
+ self.log.debug(
"username to ssh to host: %s is not specified for connection id"
" %s. Using system's default provided by getpass.getuser()",
self.remote_host, self.ssh_conn_id
@@ -142,17 +142,17 @@ class SSHHook(BaseHook, LoggingMixin):
self.client = client
except paramiko.AuthenticationException as auth_error:
- self.logger.error(
+ self.log.error(
"Auth failed while connecting to host: %s, error: %s",
self.remote_host, auth_error
)
except paramiko.SSHException as ssh_error:
- self.logger.error(
+ self.log.error(
"Failed connecting to host: %s, error: %s",
self.remote_host, ssh_error
)
except Exception as error:
- self.logger.error(
+ self.log.error(
"Error connecting to host: %s, error: %s",
self.remote_host, error
)
@@ -191,7 +191,7 @@ class SSHHook(BaseHook, LoggingMixin):
]
ssh_cmd += ssh_tunnel_cmd
- self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd)
+ self.log.debug("Creating tunnel with cmd: %s", ssh_cmd)
proc = subprocess.Popen(ssh_cmd,
stdin=subprocess.PIPE,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 37e4a97..a2ba824 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -87,7 +87,7 @@ class BigQueryOperator(BaseOperator):
self.query_params = query_params
def execute(self, context):
- self.logger.info('Executing: %s', self.bql)
+ self.log.info('Executing: %s', self.bql)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 21de7cc..0f4ef50 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -53,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator):
self.ignore_if_missing = ignore_if_missing
def execute(self, context):
- self.logger.info('Deleting: %s', self.deletion_dataset_table)
+ self.log.info('Deleting: %s', self.deletion_dataset_table)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 8e21270..2bc4a8b 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -68,7 +68,7 @@ class BigQueryToBigQueryOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- self.logger.info(
+ self.log.info(
'Executing copy of %s into: %s',
self.source_project_dataset_tables, self.destination_project_dataset_table
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index 23a2029..800e7bd 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -79,9 +79,9 @@ class BigQueryToCloudStorageOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- self.logger.info('Executing extract of %s into: %s',
- self.source_project_dataset_table,
- self.destination_cloud_storage_uris)
+ self.log.info('Executing extract of %s into: %s',
+ self.source_project_dataset_table,
+ self.destination_cloud_storage_uris)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 8773357..cffc4ff 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -214,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
raise AirflowException(msg)
def _log_run_page_url(self, url):
- self.logger.info('View run status, Spark UI, and logs at %s', url)
+ self.log.info('View run status, Spark UI, and logs at %s', url)
def get_hook(self):
return DatabricksHook(
@@ -225,13 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
hook = self.get_hook()
self.run_id = hook.submit_run(self.json)
run_page_url = hook.get_run_page_url(self.run_id)
- self.logger.info('Run submitted with run_id: %s', self.run_id)
+ self.log.info('Run submitted with run_id: %s', self.run_id)
self._log_run_page_url(run_page_url)
while True:
run_state = hook.get_run_state(self.run_id)
if run_state.is_terminal:
if run_state.is_successful:
- self.logger.info('%s completed successfully.', self.task_id)
+ self.log.info('%s completed successfully.', self.task_id)
self._log_run_page_url(run_page_url)
return
else:
@@ -240,15 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
s=run_state)
raise AirflowException(error_message)
else:
- self.logger.info('%s in run state: %s', self.task_id, run_state)
+ self.log.info('%s in run state: %s', self.task_id, run_state)
self._log_run_page_url(run_page_url)
- self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds)
+ self.log.info('Sleeping for %s seconds.', self.polling_period_seconds)
time.sleep(self.polling_period_seconds)
def on_kill(self):
hook = self.get_hook()
hook.cancel_run(self.run_id)
- self.logger.info(
+ self.log.info(
'Task: %s with run_id: %s was requested to be cancelled.',
self.task_id, self.run_id
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 3c22b60..bdb0335 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -177,12 +177,12 @@ class DataprocClusterCreateOperator(BaseOperator):
while True:
state = self._get_cluster_state(service)
if state is None:
- self.logger.info("No state for cluster '%s'", self.cluster_name)
+ self.log.info("No state for cluster '%s'", self.cluster_name)
time.sleep(15)
else:
- self.logger.info("State for cluster '%s' is %s", self.cluster_name, state)
+ self.log.info("State for cluster '%s' is %s", self.cluster_name, state)
if self._cluster_ready(state, service):
- self.logger.info(
+ self.log.info(
"Cluster '%s' successfully created", self.cluster_name
)
return
@@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator):
return cluster_data
def execute(self, context):
- self.logger.info('Creating cluster: %s', self.cluster_name)
+ self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
@@ -272,7 +272,7 @@ class DataprocClusterCreateOperator(BaseOperator):
service = hook.get_conn()
if self._get_cluster(service):
- self.logger.info(
+ self.log.info(
'Cluster %s already exists... Checking status...',
self.cluster_name
)
@@ -290,7 +290,7 @@ class DataprocClusterCreateOperator(BaseOperator):
# probably two cluster start commands at the same time
time.sleep(10)
if self._get_cluster(service):
- self.logger.info(
+ self.log.info(
'Cluster {} already exists... Checking status...',
self.cluster_name
)
@@ -358,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
time.sleep(15)
def execute(self, context):
- self.logger.info('Deleting cluster: %s', self.cluster_name)
+ self.log.info('Deleting cluster: %s', self.cluster_name)
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
@@ -371,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
clusterName=self.cluster_name
).execute()
operation_name = response['name']
- self.logger.info("Cluster delete operation name: %s", operation_name)
+ self.log.info("Cluster delete operation name: %s", operation_name)
self._wait_for_done(service, operation_name)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 76415e1..51e1d06 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -78,7 +78,7 @@ class DatastoreExportOperator(BaseOperator):
self.xcom_push = xcom_push
def execute(self, context):
- self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket)
+ self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket)
if self.overwrite_existing and self.namespace:
gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 74bd940..d8c42e7 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -72,7 +72,7 @@ class DatastoreImportOperator(BaseOperator):
self.xcom_push = xcom_push
def execute(self, context):
- self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket)
+ self.log.info('Importing data from Cloud Storage bucket %s', self.bucket)
ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
result = ds_hook.import_from_storage_bucket(bucket=self.bucket,
file=self.file,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 0c75eaa..898a77a 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -56,11 +56,11 @@ class ECSOperator(BaseOperator):
self.hook = self.get_hook()
def execute(self, context):
- self.logger.info(
+ self.log.info(
'Running ECS Task - Task definition: %s - on cluster %s',
self.task_definition,self.cluster
)
- self.logger.info('ECSOperator overrides: %s', self.overrides)
+ self.log.info('ECSOperator overrides: %s', self.overrides)
self.client = self.hook.get_client_type(
'ecs',
@@ -77,13 +77,13 @@ class ECSOperator(BaseOperator):
failures = response['failures']
if len(failures) > 0:
raise AirflowException(response)
- self.logger.info('ECS Task started: %s', response)
+ self.log.info('ECS Task started: %s', response)
self.arn = response['tasks'][0]['taskArn']
self._wait_for_task_ended()
self._check_success_task()
- self.logger.info('ECS Task has been successfully executed: %s', response)
+ self.log.info('ECS Task has been successfully executed: %s', response)
def _wait_for_task_ended(self):
waiter = self.client.get_waiter('tasks_stopped')
@@ -98,7 +98,7 @@ class ECSOperator(BaseOperator):
cluster=self.cluster,
tasks=[self.arn]
)
- self.logger.info('ECS Task stopped, check status: %s', response)
+ self.log.info('ECS Task stopped, check status: %s', response)
if len(response.get('failures', [])) > 0:
raise AirflowException(response)
@@ -124,4 +124,4 @@ class ECSOperator(BaseOperator):
cluster=self.cluster,
task=self.arn,
reason='Task killed by the user')
- self.logger.info(response)
+ self.log.info(response)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index dbf764e..227474e 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -48,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- self.logger.info('Adding steps to %s', self.job_flow_id)
+ self.log.info('Adding steps to %s', self.job_flow_id)
response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('Adding steps failed: %s' % response)
else:
- self.logger.info('Steps %s added to JobFlow', response['StepIds'])
+ self.log.info('Steps %s added to JobFlow', response['StepIds'])
return response['StepIds']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 4e40b17..2544adf 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -50,7 +50,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
- self.logger.info(
+ self.log.info(
'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',
self.aws_conn_id, self.emr_conn_id
)
@@ -59,5 +59,5 @@ class EmrCreateJobFlowOperator(BaseOperator):
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow creation failed: %s' % response)
else:
- self.logger.info('JobFlow with id %s created', response['JobFlowId'])
+ self.log.info('JobFlow with id %s created', response['JobFlowId'])
return response['JobFlowId']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index df641ad..ec29897 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -43,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- self.logger.info('Terminating JobFlow %s', self.job_flow_id)
+ self.log.info('Terminating JobFlow %s', self.job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow termination failed: %s' % response)
else:
- self.logger.info('JobFlow with id %s terminated', self.job_flow_id)
+ self.log.info('JobFlow with id %s terminated', self.job_flow_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index 4519e1e..3478dd3 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -51,7 +51,7 @@ class FileToWasbOperator(BaseOperator):
def execute(self, context):
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
- self.logger.info(
+ self.log.info(
'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
)
hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
index ca7d716..e7640c8 100644
--- a/airflow/contrib/operators/fs_operator.py
+++ b/airflow/contrib/operators/fs_operator.py
@@ -48,7 +48,7 @@ class FileSensor(BaseSensorOperator):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = "/".join([basepath, self.filepath])
- self.logger.info('Poking for file {full_path}'.format(**locals()))
+ self.log.info('Poking for file {full_path}'.format(**locals()))
try:
files = [f for f in walk(full_path)]
except:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 27e85b7..53516b1 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -65,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
+ self.log.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
file_bytes = hook.download(self.bucket, self.object, self.filename)
@@ -74,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
else:
raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
- self.logger.info(file_bytes)
+ self.log.info(file_bytes)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 01f53cc..730a3bc 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -189,7 +189,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
self.destination_project_dataset_table))
row = cursor.fetchone()
max_id = row[0] if row[0] else 0
- self.logger.info(
+ self.log.info(
'Loaded BQ data with max %s.%s=%s',
self.destination_project_dataset_table, self.max_id_key, max_id
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index 19c6d76..d82ad61 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -66,8 +66,8 @@ class HipChatAPIOperator(BaseOperator):
'Authorization': 'Bearer %s' % self.token},
data=self.body)
if response.status_code >= 400:
- self.logger.error('HipChat API call failed: %s %s',
- response.status_code, response.reason)
+ self.log.error('HipChat API call failed: %s %s',
+ response.status_code, response.reason)
raise AirflowException('HipChat API call failed: %s %s' %
(response.status_code, response.reason))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mlengine_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index fdbfede..4d8943b 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -22,9 +22,9 @@ from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from apiclient import errors
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def _create_prediction_input(project_id,
@@ -225,7 +225,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
model_name, version_name, uri, max_worker_count,
runtime_version)
except ValueError as e:
- self.logger.error(
+ self.log.error(
'Cannot create batch prediction job request due to: %s',
e
)
@@ -251,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
raise
if finished_prediction_job['state'] != 'SUCCEEDED':
- self.logger.error(
+ self.log.error(
'Batch prediction job failed: %s',
str(finished_prediction_job))
raise RuntimeError(finished_prediction_job['errorMessage'])
@@ -538,8 +538,8 @@ class MLEngineTrainingOperator(BaseOperator):
}
if self._mode == 'DRY_RUN':
- self.logger.info('In dry_run mode.')
- self.logger.info('MLEngine Training job request is: {}'.format(training_request))
+ self.log.info('In dry_run mode.')
+ self.log.info('MLEngine Training job request is: {}'.format(training_request))
return
hook = MLEngineHook(
@@ -557,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator):
raise
if finished_training_job['state'] != 'SUCCEEDED':
- self.logger.error('MLEngine training job failed: {}'.format(
+ self.log.error('MLEngine training job failed: {}'.format(
str(finished_training_job)))
raise RuntimeError(finished_training_job['errorMessage'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index f7b3a5a..c8ebcd0 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -168,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
'mode': field_mode,
})
- self.logger.info('Using schema for %s: %s', self.schema_filename, schema)
+ self.log.info('Using schema for %s: %s', self.schema_filename, schema)
tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
json.dump(schema, tmp_schema_file_handle)
return {self.schema_filename: tmp_schema_file_handle}