You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/19 08:17:30 UTC
[1/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to
log
Repository: incubator-airflow
Updated Branches:
refs/heads/master 8e253c750 -> eb2f58909
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/logging_mixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
new file mode 100644
index 0000000..a3aad5b
--- /dev/null
+++ b/airflow/utils/log/logging_mixin.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import logging
+import warnings
+from builtins import object
+
+
+class LoggingMixin(object):
+ """
+ Convenience super-class to have a logger configured with the class name
+ """
+
+ # We want to deprecate the logger property in Airflow 2.0
+ # The log property is the de facto standard in most programming languages
+ @property
+ def logger(self):
+ warnings.warn(
+ 'Initializing logger for {} using logger(), which will '
+ 'be replaced by .log in Airflow 2.0'.format(
+ self.__class__.__module__ + '.' + self.__class__.__name__
+ ),
+ DeprecationWarning
+ )
+ return self.log
+
+ @property
+ def log(self):
+ try:
+ return self._log
+ except AttributeError:
+ self._log = logging.root.getChild(
+ self.__class__.__module__ + '.' + self.__class__.__name__
+ )
+ return self._log
+
+ def set_log_contexts(self, task_instance):
+ """
+ Set the context for all handlers of current logger.
+ """
+ for handler in self.log.handlers:
+ try:
+ handler.set_context(task_instance)
+ except AttributeError:
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 71fc149..2ed97a1 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -14,7 +14,7 @@
import os
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -36,7 +36,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
from airflow.hooks.S3_hook import S3Hook
return S3Hook(remote_conn_id)
except:
- self.logger.error(
+ self.log.error(
'Could not create an S3Hook with connection id "%s". '
'Please make sure that airflow[s3] is installed and '
'the S3 connection exists.', remote_conn_id
@@ -132,7 +132,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
# return error if needed
if return_error:
msg = 'Could not read logs from {}'.format(remote_log_location)
- self.logger.error(msg)
+ self.log.error(msg)
return msg
def s3_write(self, log, remote_log_location, append=True):
@@ -159,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
)
except:
- self.logger.error('Could not write logs to %s', remote_log_location)
+ self.log.error('Could not write logs to %s', remote_log_location)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/timeout.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py
index 53f2149..e0b3f96 100644
--- a/airflow/utils/timeout.py
+++ b/airflow/utils/timeout.py
@@ -20,7 +20,7 @@ from __future__ import unicode_literals
import signal
from airflow.exceptions import AirflowTaskTimeout
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class timeout(LoggingMixin):
@@ -33,7 +33,7 @@ class timeout(LoggingMixin):
self.error_message = error_message
def handle_timeout(self, signum, frame):
- self.logger.error("Process timed out")
+ self.log.error("Process timed out")
raise AirflowTaskTimeout(self.error_message)
def __enter__(self):
@@ -41,12 +41,12 @@ class timeout(LoggingMixin):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
except ValueError as e:
- self.logger.warning("timeout can't be used in the current context")
- self.logger.exception(e)
+ self.log.warning("timeout can't be used in the current context")
+ self.log.exception(e)
def __exit__(self, type, value, traceback):
try:
signal.alarm(0)
except ValueError as e:
- self.logger.warning("timeout can't be used in the current context")
- self.logger.exception(e)
+ self.log.warning("timeout can't be used in the current context")
+ self.log.exception(e)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 4e5892d..b5a3052 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -18,7 +18,7 @@ from airflow.api.common.experimental import trigger_dag as trigger
from airflow.api.common.experimental.get_task import get_task
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.www.app import csrf
from flask import (
@@ -27,7 +27,7 @@ from flask import (
)
from datetime import datetime
-_log = LoggingMixin().logger
+_log = LoggingMixin().log
requires_authentication = airflow.api.api_auth.requires_authentication
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index f280713..438a1e2 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -113,7 +113,7 @@ def create_app(config=None, testing=False):
def integrate_plugins():
"""Integrate plugins to the context"""
- log = LoggingMixin().logger
+ log = LoggingMixin().log
from airflow.plugins_manager import (
admin_views, flask_blueprints, menu_links)
for v in admin_views:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/scripts/perf/scheduler_ops_metrics.py
----------------------------------------------------------------------
diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py
index 40e1b36..34b5a83 100644
--- a/scripts/perf/scheduler_ops_metrics.py
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -119,9 +119,9 @@ class SchedulerMetricsJob(SchedulerJob):
(datetime.now()-self.start_date).total_seconds() >
MAX_RUNTIME_SECS):
if (len(successful_tis) == num_task_instances):
- self.logger.info("All tasks processed! Printing stats.")
+ self.log.info("All tasks processed! Printing stats.")
else:
- self.logger.info("Test timeout reached. "
+ self.log.info("Test timeout reached. "
"Printing available stats.")
self.print_stats()
set_dags_paused_state(True)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/contrib/hooks/test_databricks_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_databricks_hook.py b/tests/contrib/hooks/test_databricks_hook.py
index e091067..3931bd3 100644
--- a/tests/contrib/hooks/test_databricks_hook.py
+++ b/tests/contrib/hooks/test_databricks_hook.py
@@ -111,7 +111,7 @@ class DatabricksHookTest(unittest.TestCase):
@mock.patch('airflow.contrib.hooks.databricks_hook.requests')
def test_do_api_call_with_error_retry(self, mock_requests):
for exception in [requests_exceptions.ConnectionError, requests_exceptions.Timeout]:
- with mock.patch.object(self.hook.logger, 'error') as mock_errors:
+ with mock.patch.object(self.hook.log, 'error') as mock_errors:
mock_requests.reset_mock()
mock_requests.post.side_effect = exception()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 89ad258..7ce6199 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -132,7 +132,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
zone=ZONE,
dag=self.dag
)
- with patch.object(dataproc_task.logger, 'info') as mock_info:
+ with patch.object(dataproc_task.log, 'info') as mock_info:
with self.assertRaises(TypeError) as _:
dataproc_task.execute(None)
mock_info.assert_called_with('Creating cluster: %s', CLUSTER_NAME)
@@ -148,7 +148,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
zone=ZONE,
dag=self.dag
)
- with patch.object(dataproc_task.logger, 'info') as mock_info:
+ with patch.object(dataproc_task.log, 'info') as mock_info:
context = { 'ts_nodash' : 'testnodash'}
rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
@@ -190,7 +190,7 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
project_id=PROJECT_ID,
dag=self.dag
)
- with patch.object(dataproc_task.logger, 'info') as mock_info:
+ with patch.object(dataproc_task.log, 'info') as mock_info:
with self.assertRaises(TypeError) as _:
dataproc_task.execute(None)
mock_info.assert_called_with('Deleting cluster: %s', CLUSTER_NAME)
@@ -205,7 +205,7 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
dag=self.dag
)
- with patch.object(dataproc_task.logger, 'info') as mock_info:
+ with patch.object(dataproc_task.log, 'info') as mock_info:
context = { 'ts_nodash' : 'testnodash'}
rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
index 0e2ed0c..290089b 100644
--- a/tests/contrib/sensors/test_hdfs_sensors.py
+++ b/tests/contrib/sensors/test_hdfs_sensors.py
@@ -26,8 +26,8 @@ class HdfsSensorFolderTests(unittest.TestCase):
raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
from tests.core import FakeHDFSHook
self.hook = FakeHDFSHook
- self.logger = logging.getLogger()
- self.logger.setLevel(logging.DEBUG)
+ self.log = logging.getLogger()
+ self.log.setLevel(logging.DEBUG)
def test_should_be_empty_directory(self):
"""
@@ -35,9 +35,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
task = HdfsSensorFolder(task_id='Should_be_empty_directory',
filepath='/datadirectory/empty_directory',
be_empty=True,
@@ -58,9 +58,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
filepath='/datadirectory/not_empty_directory',
be_empty=True,
@@ -80,9 +80,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
filepath='/datadirectory/not_empty_directory',
timeout=1,
@@ -102,9 +102,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
filepath='/datadirectory/empty_directory',
timeout=1,
@@ -124,8 +124,8 @@ class HdfsSensorRegexTests(unittest.TestCase):
raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
from tests.core import FakeHDFSHook
self.hook = FakeHDFSHook
- self.logger = logging.getLogger()
- self.logger.setLevel(logging.DEBUG)
+ self.log = logging.getLogger()
+ self.log.setLevel(logging.DEBUG)
def test_should_match_regex(self):
"""
@@ -133,9 +133,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
compiled_regex = re.compile("test[1-2]file")
task = HdfsSensorRegex(task_id='Should_match_the_regex',
filepath='/datadirectory/regex_dir',
@@ -157,9 +157,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
compiled_regex = re.compile("^IDoNotExist")
task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
filepath='/datadirectory/regex_dir',
@@ -180,9 +180,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
compiled_regex = re.compile("test[1-2]file")
task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
filepath='/datadirectory/regex_dir',
@@ -207,9 +207,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
compiled_regex = re.compile("test[1-2]file")
task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
filepath='/datadirectory/regex_dir',
@@ -231,9 +231,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
:return:
"""
# Given
- self.logger.debug('#' * 10)
- self.logger.debug('Running %s', self._testMethodName)
- self.logger.debug('#' * 10)
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
compiled_regex = re.compile("copying_file_\d+.txt")
task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
filepath='/datadirectory/regex_dir',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
index 9ec6cd4..a0e227c 100644
--- a/tests/executors/test_executor.py
+++ b/tests/executors/test_executor.py
@@ -29,8 +29,8 @@ class TestExecutor(BaseExecutor):
super(TestExecutor, self).__init__(*args, **kwargs)
def execute_async(self, key, command, queue=None):
- self.logger.debug("{} running task instances".format(len(self.running)))
- self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+ self.log.debug("{} running task instances".format(len(self.running)))
+ self.log.debug("{} in queue".format(len(self.queued_tasks)))
def heartbeat(self):
session = settings.Session()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index 9b256e6..ee67524 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -75,7 +75,7 @@ class TimeoutTestSensor(BaseSensorOperator):
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
time.sleep(self.poke_interval)
- self.logger.info("Success criteria met. Exiting.")
+ self.log.info("Success criteria met. Exiting.")
class SensorTimeoutTest(unittest.TestCase):
@@ -187,7 +187,7 @@ class HttpSensorTests(unittest.TestCase):
poke_interval=1
)
- with mock.patch.object(task.hook.logger, 'error') as mock_errors:
+ with mock.patch.object(task.hook.log, 'error') as mock_errors:
with self.assertRaises(AirflowSensorTimeout):
task.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/test_utils/reset_warning_registry.py
----------------------------------------------------------------------
diff --git a/tests/test_utils/reset_warning_registry.py b/tests/test_utils/reset_warning_registry.py
new file mode 100644
index 0000000..a275a6d
--- /dev/null
+++ b/tests/test_utils/reset_warning_registry.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+
+
+# We need to explicitly clear the warning registry context
+# https://docs.python.org/2/library/warnings.html
+# One thing to be aware of is that if a warning has already been raised because
+# of a once/default rule, then no matter what filters are set the warning will
+# not be seen again unless the warnings registry related to the warning has
+# been cleared.
+#
+# Proposed fix from Stack overflow, which refers to the Python bug-page
+# noqa
+# https://stackoverflow.com/questions/19428761/python-showing-once-warnings-again-resetting-all-warning-registries
+class reset_warning_registry(object):
+ """
+ context manager which archives & clears warning registry for duration of
+ context.
+
+ :param pattern:
+ optional regex pattern, causes manager to only reset modules whose
+ names match this pattern. defaults to ``".*"``.
+ """
+
+ #: regexp for filtering which modules are reset
+ _pattern = None
+
+ #: dict mapping module name -> old registry contents
+ _backup = None
+
+ def __init__(self, pattern=None):
+ self._pattern = re.compile(pattern or ".*")
+
+ def __enter__(self):
+ # archive and clear the __warningregistry__ key for all modules
+ # that match the 'reset' pattern.
+ pattern = self._pattern
+ backup = self._backup = {}
+ for name, mod in list(sys.modules.items()):
+ if pattern.match(name):
+ reg = getattr(mod, "__warningregistry__", None)
+ if reg:
+ backup[name] = reg.copy()
+ reg.clear()
+ return self
+
+ def __exit__(self, *exc_info):
+ # restore warning registry from backup
+ modules = sys.modules
+ backup = self._backup
+ for name, content in backup.items():
+ mod = modules.get(name)
+ if mod is None:
+ continue
+ reg = getattr(mod, "__warningregistry__", None)
+ if reg is None:
+ setattr(mod, "__warningregistry__", content)
+ else:
+ reg.clear()
+ reg.update(content)
+
+ # clear all registry entries that we didn't archive
+ pattern = self._pattern
+ for name, mod in list(modules.items()):
+ if pattern.match(name) and name not in backup:
+ reg = getattr(mod, "__warningregistry__", None)
+ if reg:
+ reg.clear()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/utils/log/test_logging.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_logging.py b/tests/utils/log/test_logging.py
index 7e05c7d..8df6dfc 100644
--- a/tests/utils/log/test_logging.py
+++ b/tests/utils/log/test_logging.py
@@ -41,7 +41,7 @@ class TestS3TaskHandler(unittest.TestCase):
def test_init_raises(self):
self.hook_mock.side_effect = Exception('Failed to connect')
handler = S3TaskHandler()
- with mock.patch.object(handler.logger, 'error') as mock_error:
+ with mock.patch.object(handler.log, 'error') as mock_error:
# Initialize the hook
handler.hook()
mock_error.assert_called_once_with(
@@ -81,7 +81,7 @@ class TestS3TaskHandler(unittest.TestCase):
def test_read_raises_return_error(self):
self.hook_inst_mock.get_key.side_effect = Exception('error')
handler = S3TaskHandler()
- with mock.patch.object(handler.logger, 'error') as mock_error:
+ with mock.patch.object(handler.log, 'error') as mock_error:
result = handler.s3_log_read(
self.remote_log_location,
return_error=True
@@ -102,7 +102,7 @@ class TestS3TaskHandler(unittest.TestCase):
def test_write_raises(self):
self.hook_inst_mock.load_string.side_effect = Exception('error')
handler = S3TaskHandler()
- with mock.patch.object(handler.logger, 'error') as mock_error:
+ with mock.patch.object(handler.log, 'error') as mock_error:
handler.write('text', self.remote_log_location)
msg = 'Could not write logs to %s' % self.remote_log_location
mock_error.assert_called_once_with(msg)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/utils/test_logging_mixin.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging_mixin.py b/tests/utils/test_logging_mixin.py
new file mode 100644
index 0000000..bf9e225
--- /dev/null
+++ b/tests/utils/test_logging_mixin.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import warnings
+
+from airflow.operators.bash_operator import BashOperator
+from tests.test_utils.reset_warning_registry import reset_warning_registry
+
+
+class TestLoggingMixin(unittest.TestCase):
+ def setUp(self):
+ warnings.filterwarnings(
+ action='always'
+ )
+
+ def test_log(self):
+ op = BashOperator(
+ task_id='task-1',
+ bash_command='exit 0'
+ )
+ with reset_warning_registry():
+ with warnings.catch_warnings(record=True) as w:
+ # Set to always, because the warning may have been thrown before
+ # Trigger the warning
+ op.logger.info('Some arbitrary line')
+
+ self.assertEqual(len(w), 1)
+
+ warning = w[0]
+ self.assertTrue(issubclass(warning.category, DeprecationWarning))
+ self.assertEqual(
+ 'Initializing logger for airflow.operators.bash_operator.BashOperator'
+ ' using logger(), which will be replaced by .log in Airflow 2.0',
+ str(warning.message)
+ )
+
+ def tearDown(self):
+ warnings.resetwarnings()
[3/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to
log
Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index 5abfc51..44ea66d 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -81,12 +81,12 @@ class SFTPOperator(BaseOperator):
if self.operation.lower() == SFTPOperation.GET:
file_msg = "from {0} to {1}".format(self.remote_filepath,
self.local_filepath)
- self.logger.debug("Starting to transfer %s", file_msg)
+ self.log.debug("Starting to transfer %s", file_msg)
sftp_client.get(self.remote_filepath, self.local_filepath)
else:
file_msg = "from {0} to {1}".format(self.local_filepath,
self.remote_filepath)
- self.logger.debug("Starting to transfer file %s", file_msg)
+ self.log.debug("Starting to transfer file %s", file_msg)
sftp_client.put(self.local_filepath, self.remote_filepath)
except Exception as e:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index fc9cf3b..7a319f2 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -39,6 +39,6 @@ class VerticaOperator(BaseOperator):
self.sql = sql
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = VerticaHook(vertica_conn_id=self.vertica_conn_id)
hook.run(self.sql)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 35ff3c6..5e769fc 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -103,7 +103,7 @@ class VerticaToHiveTransfer(BaseOperator):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
- self.logger.info("Dumping Vertica query results to local file")
+ self.log.info("Dumping Vertica query results to local file")
conn = vertica.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
@@ -119,7 +119,7 @@ class VerticaToHiveTransfer(BaseOperator):
f.flush()
cursor.close()
conn.close()
- self.logger.info("Loading file into Hive")
+ self.log.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 630cebe..90a3264 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -59,7 +59,7 @@ class BigQueryTableSensor(BaseSensorOperator):
def poke(self, context):
table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
- self.logger.info('Sensor checks existence of table: %s', table_uri)
+ self.log.info('Sensor checks existence of table: %s', table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index 4ee45f9..e1a9169 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -67,7 +67,7 @@ class DatadogSensor(BaseSensorOperator):
tags=self.tags)
if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
- self.logger.error("Unexpected Datadog result: %s", response)
+ self.log.error("Unexpected Datadog result: %s", response)
raise AirflowException("Datadog returned unexpected result")
if self.response_check:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 034fcb6..3ecaa42 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -36,11 +36,11 @@ class EmrBaseSensor(BaseSensorOperator):
response = self.get_emr_response()
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
- self.logger.info('Bad HTTP response: %s', response)
+ self.log.info('Bad HTTP response: %s', response)
return False
state = self.state_from_response(response)
- self.logger.info('Job flow currently %s', state)
+ self.log.info('Job flow currently %s', state)
if state in self.NON_TERMINAL_STATES:
return False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index e5610a1..87b65c8 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -41,7 +41,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
def get_emr_response(self):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- self.logger.info('Poking cluster %s', self.job_flow_id)
+ self.log.info('Poking cluster %s', self.job_flow_id)
return emr.describe_cluster(ClusterId=self.job_flow_id)
def state_from_response(self, response):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index e131d77..003d2d1 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -45,7 +45,7 @@ class EmrStepSensor(EmrBaseSensor):
def get_emr_response(self):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- self.logger.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
+ self.log.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id)
def state_from_response(self, response):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index 2e604e9..bd66c32 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -42,7 +42,7 @@ class FTPSensor(BaseSensorOperator):
def poke(self, context):
with self._create_hook() as hook:
- self.logger.info('Poking for %s', self.path)
+ self.log.info('Poking for %s', self.path)
try:
hook.get_mod_time(self.path)
except ftplib.error_perm as e:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 800c1bd..384e26f 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -54,7 +54,7 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
self.delegate_to = delegate_to
def poke(self, context):
- self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+ self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
@@ -116,7 +116,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
self.delegate_to = delegate_to
def poke(self, context):
- self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+ self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 11e8b07..1893f01 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -28,7 +28,7 @@ class HdfsSensorRegex(HdfsSensor):
:return: Bool depending on the search criteria
"""
sb = self.hook(self.hdfs_conn_id).get_conn()
- self.logger.info(
+ self.log.info(
'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
)
result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
@@ -56,10 +56,10 @@ class HdfsSensorFolder(HdfsSensor):
result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
result = self.filter_for_filesize(result, self.file_size)
if self.be_empty:
- self.logger.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
+ self.log.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
return len(result) == 1 and result[0]['path'] == self.filepath
else:
- self.logger.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
+ self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
result.pop(0)
return bool(result) and result[0]['file_type'] == 'f'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 4cbc676..1dc7b50 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -97,7 +97,7 @@ class JiraTicketSensor(JiraSensor):
*args, **kwargs)
def poke(self, context):
- self.logger.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
+ self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
self.jira_operator.method_name = "issue"
self.jira_operator.jira_method_args = {
@@ -123,19 +123,19 @@ class JiraTicketSensor(JiraSensor):
and getattr(field_value, 'name'):
result = self.expected_value.lower() == field_value.name.lower()
else:
- self.logger.warning(
+ self.log.warning(
"Not implemented checker for issue field %s which "
"is neither string nor list nor Jira Resource",
self.field
)
except JIRAError as jira_error:
- self.logger.error("Jira error while checking with expected value: %s", jira_error)
+ self.log.error("Jira error while checking with expected value: %s", jira_error)
except Exception as e:
- self.logger.error("Error while checking with expected value %s:", self.expected_value)
- self.logger.exception(e)
+ self.log.error("Error while checking with expected value %s:", self.expected_value)
+ self.log.exception(e)
if result is True:
- self.logger.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
+ self.log.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
else:
- self.logger.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
+ self.log.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
return result
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 220d766..6cc314b 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -38,5 +38,5 @@ class RedisKeySensor(BaseSensorOperator):
self.key = key
def poke(self, context):
- self.logger.info('Sensor check existence of key: %s', self.key)
+ self.log.info('Sensor check existence of key: %s', self.key)
return RedisHook(self.redis_conn_id).key_exists(self.key)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 1a54e12..4295a25 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -47,7 +47,7 @@ class WasbBlobSensor(BaseSensorOperator):
self.check_options = check_options
def poke(self, context):
- self.logger.info(
+ self.log.info(
'Poking for blob: {self.blob_name}\n'
'in wasb://{self.container_name}'.format(**locals())
)
@@ -85,7 +85,7 @@ class WasbPrefixSensor(BaseSensorOperator):
self.check_options = check_options
def poke(self, context):
- self.logger.info(
+ self.log.info(
'Poking for prefix: {self.prefix}\n'
'in wasb://{self.container_name}'.format(**locals())
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/task_runner/cgroup_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py b/airflow/contrib/task_runner/cgroup_task_runner.py
index 5d2518d..0022fd6 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -72,10 +72,10 @@ class CgroupTaskRunner(BaseTaskRunner):
for path_element in path_split:
name_to_node = {x.name: x for x in node.children}
if path_element not in name_to_node:
- self.logger.debug("Creating cgroup %s in %s", path_element, node.path)
+ self.log.debug("Creating cgroup %s in %s", path_element, node.path)
node = node.create_cgroup(path_element)
else:
- self.logger.debug(
+ self.log.debug(
"Not creating cgroup %s in %s since it already exists",
path_element, node.path
)
@@ -94,20 +94,20 @@ class CgroupTaskRunner(BaseTaskRunner):
for path_element in path_split:
name_to_node = {x.name: x for x in node.children}
if path_element not in name_to_node:
- self.logger.warning("Cgroup does not exist: %s", path)
+ self.log.warning("Cgroup does not exist: %s", path)
return
else:
node = name_to_node[path_element]
# node is now the leaf node
parent = node.parent
- self.logger.debug("Deleting cgroup %s/%s", parent, node.name)
+ self.log.debug("Deleting cgroup %s/%s", parent, node.name)
parent.delete_cgroup(node.name)
def start(self):
# Use bash if it's already in a cgroup
cgroups = self._get_cgroup_names()
if cgroups["cpu"] != "/" or cgroups["memory"] != "/":
- self.logger.debug(
+ self.log.debug(
"Already running in a cgroup (cpu: %s memory: %s) so not creating another one",
cgroups.get("cpu"), cgroups.get("memory")
)
@@ -133,7 +133,7 @@ class CgroupTaskRunner(BaseTaskRunner):
mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
self._created_mem_cgroup = True
if self._mem_mb_limit > 0:
- self.logger.debug(
+ self.log.debug(
"Setting %s with %s MB of memory",
self.mem_cgroup_name, self._mem_mb_limit
)
@@ -143,14 +143,14 @@ class CgroupTaskRunner(BaseTaskRunner):
cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
self._created_cpu_cgroup = True
if self._cpu_shares > 0:
- self.logger.debug(
+ self.log.debug(
"Setting %s with %s CPU shares",
self.cpu_cgroup_name, self._cpu_shares
)
cpu_cgroup_node.controller.shares = self._cpu_shares
# Start the process w/ cgroups
- self.logger.debug(
+ self.log.debug(
"Starting task process with cgroups cpu,memory: %s",
cgroup_name
)
@@ -168,7 +168,7 @@ class CgroupTaskRunner(BaseTaskRunner):
# I wasn't able to track down the root cause of the package install failures, but
# we might want to revisit that approach at some other point.
if return_code == 137:
- self.logger.warning("Task failed with return code of 137. This may indicate "
+ self.log.warning("Task failed with return code of 137. This may indicate "
"that it was killed due to excessive memory usage. "
"Please consider optimizing your task or using the "
"resources argument to reserve more memory for your task")
@@ -176,7 +176,7 @@ class CgroupTaskRunner(BaseTaskRunner):
def terminate(self):
if self.process and psutil.pid_exists(self.process.pid):
- kill_process_tree(self.logger, self.process.pid)
+ kill_process_tree(self.log, self.process.pid)
def on_finish(self):
# Let the OOM watcher thread know we're done to avoid false OOM alarms
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 7812f96..c387eeb 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -19,7 +19,7 @@ from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
DEFAULT_EXECUTOR = None
@@ -41,7 +41,7 @@ def GetDefaultExecutor():
DEFAULT_EXECUTOR = _get_executor(executor_name)
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.info("Using executor %s", executor_name)
return DEFAULT_EXECUTOR
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 1197958..410a558 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -14,7 +14,7 @@
from builtins import range
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
PARALLELISM = configuration.getint('core', 'PARALLELISM')
@@ -46,7 +46,7 @@ class BaseExecutor(LoggingMixin):
def queue_command(self, task_instance, command, priority=1, queue=None):
key = task_instance.key
if key not in self.queued_tasks and key not in self.running:
- self.logger.info("Adding to queue: %s", command)
+ self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, task_instance)
def queue_task_instance(
@@ -99,9 +99,9 @@ class BaseExecutor(LoggingMixin):
else:
open_slots = self.parallelism - len(self.running)
- self.logger.debug("%s running task instances", len(self.running))
- self.logger.debug("%s in queue", len(self.queued_tasks))
- self.logger.debug("%s open slots", open_slots)
+ self.log.debug("%s running task instances", len(self.running))
+ self.log.debug("%s in queue", len(self.queued_tasks))
+ self.log.debug("%s open slots", open_slots)
sorted_queue = sorted(
[(k, v) for k, v in self.queued_tasks.items()],
@@ -122,13 +122,13 @@ class BaseExecutor(LoggingMixin):
self.running[key] = command
self.execute_async(key, command=command, queue=queue)
else:
- self.logger.debug(
+ self.log.debug(
'Task is already running, not sending to executor: %s',
key
)
# Calling child class sync method
- self.logger.debug("Calling the %s sync method", self.__class__)
+ self.log.debug("Calling the %s sync method", self.__class__)
self.sync()
def change_state(self, key, state):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 39c895c..360a276 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -24,7 +24,7 @@ from celery import states as celery_states
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
PARALLELISM = configuration.get('core', 'PARALLELISM')
@@ -53,7 +53,7 @@ class CeleryConfig(object):
try:
celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
except AirflowConfigException as e:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Celery Executor will run without SSL")
try:
@@ -76,7 +76,7 @@ app = Celery(
@app.task
def execute_command(command):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.info("Executing command in Celery: %s", command)
try:
subprocess.check_call(command, shell=True)
@@ -99,13 +99,13 @@ class CeleryExecutor(BaseExecutor):
self.last_state = {}
def execute_async(self, key, command, queue=DEFAULT_QUEUE):
- self.logger.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
+ self.log.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING
def sync(self):
- self.logger.debug("Inquiring about %s celery task(s)", len(self.tasks))
+ self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
for key, async in list(self.tasks.items()):
try:
state = async.state
@@ -123,11 +123,11 @@ class CeleryExecutor(BaseExecutor):
del self.tasks[key]
del self.last_state[key]
else:
- self.logger.info("Unexpected state: %s", async.state)
+ self.log.info("Unexpected state: %s", async.state)
self.last_state[key] = async.state
except Exception as e:
- self.logger.error("Error syncing the celery executor, ignoring it:")
- self.logger.exception(e)
+ self.log.error("Error syncing the celery executor, ignoring it:")
+ self.log.exception(e)
def end(self, synchronous=False):
if synchronous:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 8a56506..07b8a82 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -53,10 +53,10 @@ class DaskExecutor(BaseExecutor):
if future.done():
key = self.futures[future]
if future.exception():
- self.logger.error("Failed to execute task: %s", repr(future.exception()))
+ self.log.error("Failed to execute task: %s", repr(future.exception()))
self.fail(key)
elif future.cancelled():
- self.logger.error("Failed to execute task")
+ self.log.error("Failed to execute task")
self.fail(key)
else:
self.success(key)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 9730737..f9eceb3 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -20,7 +20,7 @@ from builtins import range
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
PARALLELISM = configuration.get('core', 'PARALLELISM')
@@ -40,14 +40,14 @@ class LocalWorker(multiprocessing.Process, LoggingMixin):
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
- self.logger.info("%s running %s", self.__class__.__name__, command)
+ self.log.info("%s running %s", self.__class__.__name__, command)
command = "exec bash -c '{0}'".format(command)
try:
subprocess.check_call(command, shell=True)
state = State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
- self.logger.error("Failed to execute task %s.", str(e))
+ self.log.error("Failed to execute task %s.", str(e))
# TODO: Why is this commented out?
# raise e
self.result_queue.put((key, state))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/sequential_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py
index 7d08a4b..a15450d 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -37,14 +37,14 @@ class SequentialExecutor(BaseExecutor):
def sync(self):
for key, command in self.commands_to_run:
- self.logger.info("Executing command: %s", command)
+ self.log.info("Executing command: %s", command)
try:
subprocess.check_call(command, shell=True)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, State.FAILED)
- self.logger.error("Failed to execute task %s.", str(e))
+ self.log.error("Failed to execute task %s.", str(e))
self.commands_to_run = []
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 2f7e6ee..c405001 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -16,7 +16,7 @@ from __future__ import division
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
import re
@@ -87,7 +87,7 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None):
if Config.has_option(cred_section, 'calling_format'):
calling_format = Config.get(cred_section, 'calling_format')
except:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Option Error in parsing s3 config file")
raise
return (access_key, secret_key, calling_format)
@@ -378,7 +378,7 @@ class S3Hook(BaseHook):
offset = chunk * multipart_bytes
bytes = min(multipart_bytes, key_size - offset)
with FileChunkIO(filename, 'r', offset=offset, bytes=bytes) as fp:
- self.logger.info('Sending chunk %s of %s...', chunk + 1, total_chunks)
+ self.log.info('Sending chunk %s of %s...', chunk + 1, total_chunks)
mp.upload_part_from_file(fp, part_num=chunk + 1)
except:
mp.cancel_upload()
@@ -391,7 +391,7 @@ class S3Hook(BaseHook):
key_size = key_obj.set_contents_from_filename(filename,
replace=replace,
encrypt_key=encrypt)
- self.logger.info(
+ self.log.info(
"The key {key} now contains {key_size} bytes".format(**locals())
)
@@ -432,6 +432,6 @@ class S3Hook(BaseHook):
key_size = key_obj.set_contents_from_string(string_data,
replace=replace,
encrypt_key=encrypt)
- self.logger.info(
+ self.log.info(
"The key {key} now contains {key_size} bytes".format(**locals())
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index 4617b98..92313ca 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -23,7 +23,7 @@ import random
from airflow import settings
from airflow.models import Connection
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
@@ -76,7 +76,7 @@ class BaseHook(LoggingMixin):
def get_connection(cls, conn_id):
conn = random.choice(cls.get_connections(conn_id))
if conn.host:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.info("Using connection to: %s", conn.host)
return conn
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 85eebd0..bbdedd7 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -158,7 +158,7 @@ class DbApiHook(BaseHook):
for s in sql:
if sys.version_info[0] < 3:
s = s.encode('utf-8')
- self.logger.info(s)
+ self.log.info(s)
if parameters is not None:
cur.execute(s, parameters)
else:
@@ -216,12 +216,12 @@ class DbApiHook(BaseHook):
cur.execute(sql, values)
if commit_every and i % commit_every == 0:
conn.commit()
- self.logger.info(
+ self.log.info(
"Loaded {i} into {table} rows so far".format(**locals())
)
conn.commit()
- self.logger.info(
+ self.log.info(
"Done loading. Loaded a total of {i} rows".format(**locals()))
@staticmethod
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index af3ae9b..0b13670 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -68,7 +68,7 @@ class DruidHook(BaseHook):
while running:
req_status = requests.get("{0}/{1}/status".format(url, druid_task_id))
- self.logger.info("Job still running for %s seconds...", sec)
+ self.log.info("Job still running for %s seconds...", sec)
sec = sec + 1
@@ -87,4 +87,4 @@ class DruidHook(BaseHook):
else:
raise AirflowException('Could not get status of the job, got %s', status)
- self.logger.info('Successful index')
+ self.log.info('Successful index')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 70d7642..7c73491 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -201,7 +201,7 @@ class HiveCliHook(BaseHook):
hive_cmd.extend(['-f', f.name])
if verbose:
- self.logger.info(" ".join(hive_cmd))
+ self.log.info(" ".join(hive_cmd))
sp = subprocess.Popen(
hive_cmd,
stdout=subprocess.PIPE,
@@ -215,7 +215,7 @@ class HiveCliHook(BaseHook):
break
stdout += line.decode('UTF-8')
if verbose:
- self.logger.info(line.decode('UTF-8').strip())
+ self.log.info(line.decode('UTF-8').strip())
sp.wait()
if sp.returncode:
@@ -246,7 +246,7 @@ class HiveCliHook(BaseHook):
for query in query_set:
query_preview = ' '.join(query.split())[:50]
- self.logger.info("Testing HQL [%s (...)]", query_preview)
+ self.log.info("Testing HQL [%s (...)]", query_preview)
if query_set == insert:
query = other + '; explain ' + query
else:
@@ -255,16 +255,16 @@ class HiveCliHook(BaseHook):
self.run_cli(query, verbose=False)
except AirflowException as e:
message = e.args[0].split('\n')[-2]
- self.logger.info(message)
+ self.log.info(message)
error_loc = re.search('(\d+):(\d+)', message)
if error_loc and error_loc.group(1).isdigit():
l = int(error_loc.group(1))
begin = max(l-2, 0)
end = min(l+3, len(query.split('\n')))
context = '\n'.join(query.split('\n')[begin:end])
- self.logger.info("Context :\n %s", context)
+ self.log.info("Context :\n %s", context)
else:
- self.logger.info("SUCCESS")
+ self.log.info("SUCCESS")
def load_df(
self,
@@ -397,7 +397,7 @@ class HiveCliHook(BaseHook):
hql += "TBLPROPERTIES({tprops})\n"
hql += ";"
hql = hql.format(**locals())
- self.logger.info(hql)
+ self.log.info(hql)
self.run_cli(hql)
hql = "LOAD DATA LOCAL INPATH '{filepath}' "
if overwrite:
@@ -408,7 +408,7 @@ class HiveCliHook(BaseHook):
["{0}='{1}'".format(k, v) for k, v in partition.items()])
hql += "PARTITION ({pvals});"
hql = hql.format(**locals())
- self.logger.info(hql)
+ self.log.info(hql)
self.run_cli(hql)
def kill(self):
@@ -662,7 +662,7 @@ class HiveServer2Hook(BaseHook):
# impyla uses GSSAPI instead of KERBEROS as a auth_mechanism identifier
if auth_mechanism == 'KERBEROS':
- self.logger.warning(
+ self.log.warning(
"Detected deprecated 'KERBEROS' for authMechanism for %s. Please use 'GSSAPI' instead",
self.hiveserver2_conn_id
)
@@ -696,7 +696,7 @@ class HiveServer2Hook(BaseHook):
# may be `SET` or DDL
records = cur.fetchall()
except ProgrammingError:
- self.logger.debug("get_results returned no records")
+ self.log.debug("get_results returned no records")
if records:
results = {
'data': records,
@@ -716,7 +716,7 @@ class HiveServer2Hook(BaseHook):
schema = schema or 'default'
with self.get_conn(schema) as conn:
with conn.cursor() as cur:
- self.logger.info("Running query: %s", hql)
+ self.log.info("Running query: %s", hql)
cur.execute(hql)
schema = cur.description
with open(csv_filepath, 'wb') as f:
@@ -734,8 +734,8 @@ class HiveServer2Hook(BaseHook):
writer.writerows(rows)
i += len(rows)
- self.logger.info("Written %s rows so far.", i)
- self.logger.info("Done. Loaded a total of %s rows.", i)
+ self.log.info("Written %s rows so far.", i)
+ self.log.info("Done. Loaded a total of %s rows.", i)
def get_records(self, hql, schema='default'):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/http_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py
index f168bc8..b8075a0 100644
--- a/airflow/hooks/http_hook.py
+++ b/airflow/hooks/http_hook.py
@@ -82,7 +82,7 @@ class HttpHook(BaseHook):
headers=headers)
prepped_request = session.prepare_request(req)
- self.logger.info("Sending '%s' to url: %s", self.method, url)
+ self.log.info("Sending '%s' to url: %s", self.method, url)
return self.run_and_check(session, prepped_request, extra_options)
def run_and_check(self, session, prepped_request, extra_options):
@@ -107,12 +107,12 @@ class HttpHook(BaseHook):
# Tried rewrapping, but not supported. This way, it's possible
# to get reason and code for failure by checking first 3 chars
# for the code, or do a split on ':'
- self.logger.error("HTTP error: %s", response.reason)
+ self.log.error("HTTP error: %s", response.reason)
if self.method not in ('GET', 'HEAD'):
# The sensor uses GET, so this prevents filling up the log
# with the body every time the GET 'misses'.
# That's ok to do, because GETs should be repeatable and
# all data should be visible in the log (no post data)
- self.logger.error(response.text)
+ self.log.error(response.text)
raise AirflowException(str(response.status_code)+":"+response.reason)
return response
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/oracle_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py
index f439daa..71c67e0 100644
--- a/airflow/hooks/oracle_hook.py
+++ b/airflow/hooks/oracle_hook.py
@@ -101,11 +101,11 @@ class OracleHook(DbApiHook):
cur.execute(sql)
if i % commit_every == 0:
conn.commit()
- self.logger.info('Loaded {i} into {table} rows so far'.format(**locals()))
+ self.log.info('Loaded {i} into {table} rows so far'.format(**locals()))
conn.commit()
cur.close()
conn.close()
- self.logger.info('Done loading. Loaded a total of {i} rows'.format(**locals()))
+ self.log.info('Done loading. Loaded a total of {i} rows'.format(**locals()))
def bulk_insert_rows(self, table, rows, target_fields=None, commit_every=5000):
"""A performant bulk insert for cx_Oracle that uses prepared statements via `executemany()`.
@@ -129,13 +129,13 @@ class OracleHook(DbApiHook):
cursor.prepare(prepared_stm)
cursor.executemany(None, row_chunk)
conn.commit()
- self.logger.info('[%s] inserted %s rows', table, row_count)
+ self.log.info('[%s] inserted %s rows', table, row_count)
# Empty chunk
row_chunk = []
# Commit the leftover chunk
cursor.prepare(prepared_stm)
cursor.executemany(None, row_chunk)
conn.commit()
- self.logger.info('[%s] inserted %s rows', table, row_count)
+ self.log.info('[%s] inserted %s rows', table, row_count)
cursor.close()
conn.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/pig_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py
index 29beb54..276b37a 100644
--- a/airflow/hooks/pig_hook.py
+++ b/airflow/hooks/pig_hook.py
@@ -62,7 +62,7 @@ class PigCliHook(BaseHook):
pig_properties_list = self.pig_properties.split()
pig_cmd.extend(pig_properties_list)
if verbose:
- self.logger.info(" ".join(pig_cmd))
+ self.log.info(" ".join(pig_cmd))
sp = subprocess.Popen(
pig_cmd,
stdout=subprocess.PIPE,
@@ -73,7 +73,7 @@ class PigCliHook(BaseHook):
for line in iter(sp.stdout.readline, ''):
stdout += line
if verbose:
- self.logger.info(line.strip())
+ self.log.info(line.strip())
sp.wait()
if sp.returncode:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index e7df328..4510d29 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -17,14 +17,14 @@ from airflow import configuration
from hdfs import InsecureClient, HdfsError
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
_kerberos_security_mode = configuration.get("core", "security") == "kerberos"
if _kerberos_security_mode:
try:
from hdfs.ext.kerberos import KerberosClient
except ImportError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not load the Kerberos extension for the WebHDFSHook.")
raise
from airflow.exceptions import AirflowException
@@ -49,7 +49,7 @@ class WebHDFSHook(BaseHook):
nn_connections = self.get_connections(self.webhdfs_conn_id)
for nn in nn_connections:
try:
- self.logger.debug('Trying namenode %s', nn.host)
+ self.log.debug('Trying namenode %s', nn.host)
connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
if _kerberos_security_mode:
client = KerberosClient(connection_str)
@@ -57,10 +57,10 @@ class WebHDFSHook(BaseHook):
proxy_user = self.proxy_user or nn.login
client = InsecureClient(connection_str, user=proxy_user)
client.status('/')
- self.logger.debug('Using namenode %s for hook', nn.host)
+ self.log.debug('Using namenode %s for hook', nn.host)
return client
except HdfsError as e:
- self.logger.debug(
+ self.log.debug(
"Read operation on namenode {nn.host} failed witg error: {e.message}".format(**locals())
)
nn_hosts = [c.host for c in nn_connections]
@@ -101,4 +101,4 @@ class WebHDFSHook(BaseHook):
overwrite=overwrite,
n_threads=parallelism,
**kwargs)
- self.logger.debug("Uploaded file %s to %s", source, destination)
+ self.log.debug("Uploaded file %s to %s", source, destination)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py
index 4634b22..533e9d0 100644
--- a/airflow/hooks/zendesk_hook.py
+++ b/airflow/hooks/zendesk_hook.py
@@ -37,7 +37,7 @@ class ZendeskHook(BaseHook):
"""
retry_after = int(
rate_limit_exception.response.headers.get('Retry-After', 60))
- self.logger.info(
+ self.log.info(
"Hit Zendesk API rate limit. Pausing for %s seconds",
retry_after
)
@@ -75,7 +75,7 @@ class ZendeskHook(BaseHook):
# `github.zendesk...`
# in it, but the call function needs it removed.
next_url = next_page.split(self.__url)[1]
- self.logger.info("Calling %s", next_url)
+ self.log.info("Calling %s", next_url)
more_res = zendesk.call(next_url)
results.extend(more_res[key])
if next_page == more_res['next_page']:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f855320..3c79ed9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -52,7 +52,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
list_py_file_paths)
from airflow.utils.db import provide_session, pessimistic_connection_handling
from airflow.utils.email import send_email
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
Base = models.Base
@@ -116,7 +116,7 @@ class BaseJob(Base, LoggingMixin):
try:
self.on_kill()
except:
- self.logger.error('on_kill() method failed')
+ self.log.error('on_kill() method failed')
session.merge(job)
session.commit()
session.close()
@@ -179,7 +179,7 @@ class BaseJob(Base, LoggingMixin):
self.heartbeat_callback(session=session)
session.close()
- self.logger.debug('[heart] Boom.')
+ self.log.debug('[heart] Boom.')
def run(self):
Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
@@ -271,7 +271,7 @@ class BaseJob(Base, LoggingMixin):
["{}".format(x) for x in reset_tis])
session.commit()
- self.logger.info(
+ self.log.info(
"Reset the following %s TaskInstances:\n\t%s",
len(reset_tis), task_instance_str
)
@@ -358,7 +358,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
# responsive file tailing
parent_dir, _ = os.path.split(log_file)
- _log = LoggingMixin().logger
+ _log = LoggingMixin().log
# Create the parent directory for the log file if necessary.
if not os.path.isdir(parent_dir):
@@ -438,7 +438,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
# Arbitrarily wait 5s for the process to die
self._process.join(5)
if sigkill and self._process.is_alive():
- self.logger.warning("Killing PID %s", self._process.pid)
+ self.log.warning("Killing PID %s", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
@property
@@ -478,7 +478,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
if not self._result_queue.empty():
self._result = self._result_queue.get_nowait()
self._done = True
- self.logger.debug("Waiting for %s", self._process)
+ self.log.debug("Waiting for %s", self._process)
self._process.join()
return True
@@ -488,7 +488,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
# Get the object from the queue or else join() can hang.
if not self._result_queue.empty():
self._result = self._result_queue.get_nowait()
- self.logger.debug("Waiting for %s", self._process)
+ self.log.debug("Waiting for %s", self._process)
self._process.join()
return True
@@ -578,7 +578,7 @@ class SchedulerJob(BaseJob):
self.using_sqlite = False
if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
if self.max_threads > 1:
- self.logger.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
+ self.log.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
self.max_threads = 1
self.using_sqlite = True
@@ -610,7 +610,7 @@ class SchedulerJob(BaseJob):
tasks that should have succeeded in the past hour.
"""
if not any([ti.sla for ti in dag.tasks]):
- self.logger.info(
+ self.log.info(
"Skipping SLA check for %s because no tasks in DAG have SLAs",
dag
)
@@ -693,7 +693,7 @@ class SchedulerJob(BaseJob):
notification_sent = False
if dag.sla_miss_callback:
# Execute the alert callback
- self.logger.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
+ self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
email_content = """\
@@ -843,7 +843,7 @@ class SchedulerJob(BaseJob):
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
next_run_date = dag.normalize_schedule(min(task_start_dates))
- self.logger.debug(
+ self.log.debug(
"Next run date based on tasks %s",
next_run_date
)
@@ -863,7 +863,7 @@ class SchedulerJob(BaseJob):
if next_run_date == dag.start_date:
next_run_date = dag.normalize_schedule(dag.start_date)
- self.logger.debug(
+ self.log.debug(
"Dag start date: %s. Next run date: %s",
dag.start_date, next_run_date
)
@@ -914,17 +914,17 @@ class SchedulerJob(BaseJob):
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
active_dag_runs = []
for run in dag_runs:
- self.logger.info("Examining DAG run %s", run)
+ self.log.info("Examining DAG run %s", run)
# don't consider runs that are executed in the future
if run.execution_date > datetime.now():
- self.logger.error(
+ self.log.error(
"Execution date is in future: %s",
run.execution_date
)
continue
if len(active_dag_runs) >= dag.max_active_runs:
- self.logger.info("Active dag runs > max_active_run.")
+ self.log.info("Active dag runs > max_active_run.")
continue
# skip backfill dagruns for now as long as they are not really scheduled
@@ -941,7 +941,7 @@ class SchedulerJob(BaseJob):
active_dag_runs.append(run)
for run in active_dag_runs:
- self.logger.debug("Examining active DAG run: %s", run)
+ self.log.debug("Examining active DAG run: %s", run)
# this needs a fresh session sometimes tis get detached
tis = run.get_task_instances(state=(State.NONE,
State.UP_FOR_RETRY))
@@ -962,7 +962,7 @@ class SchedulerJob(BaseJob):
if ti.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True),
session=session):
- self.logger.debug('Queuing task: %s', ti)
+ self.log.debug('Queuing task: %s', ti)
queue.append(ti.key)
session.close()
@@ -1020,7 +1020,7 @@ class SchedulerJob(BaseJob):
session.commit()
if tis_changed > 0:
- self.logger.warning(
+ self.log.warning(
"Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
tis_changed, new_state
)
@@ -1069,13 +1069,13 @@ class SchedulerJob(BaseJob):
task_instances_to_examine = ti_query.all()
if len(task_instances_to_examine) == 0:
- self.logger.info("No tasks to consider for execution.")
+ self.log.info("No tasks to consider for execution.")
return executable_tis
# Put one task instance on each line
task_instance_str = "\n\t".join(
["{}".format(x) for x in task_instances_to_examine])
- self.logger.info("Tasks up for execution:\n\t%s", task_instance_str)
+ self.log.info("Tasks up for execution:\n\t%s", task_instance_str)
# Get the pool settings
pools = {p.pool: p for p in session.query(models.Pool).all()}
@@ -1096,7 +1096,7 @@ class SchedulerJob(BaseJob):
open_slots = pools[pool].open_slots(session=session)
num_queued = len(task_instances)
- self.logger.info(
+ self.log.info(
"Figuring out tasks to run in Pool(name={pool}) with {open_slots} "
"open slots and {num_queued} task instances in queue".format(
**locals()
@@ -1111,7 +1111,7 @@ class SchedulerJob(BaseJob):
for task_instance in priority_sorted_task_instances:
if open_slots <= 0:
- self.logger.info(
+ self.log.info(
"Not scheduling since there are %s open slots in pool %s",
open_slots, pool
)
@@ -1133,12 +1133,12 @@ class SchedulerJob(BaseJob):
current_task_concurrency = dag_id_to_possibly_running_task_count[dag_id]
task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency
- self.logger.info(
+ self.log.info(
"DAG %s has %s/%s running and queued tasks",
dag_id, current_task_concurrency, task_concurrency_limit
)
if current_task_concurrency >= task_concurrency_limit:
- self.logger.info(
+ self.log.info(
"Not executing %s since the number of tasks running or queued from DAG %s"
" is >= to the DAG's task concurrency limit of %s",
task_instance, dag_id, task_concurrency_limit
@@ -1146,7 +1146,7 @@ class SchedulerJob(BaseJob):
continue
if self.executor.has_task(task_instance):
- self.logger.debug(
+ self.log.debug(
"Not handling task %s as the executor reports it is running",
task_instance.key
)
@@ -1157,7 +1157,7 @@ class SchedulerJob(BaseJob):
task_instance_str = "\n\t".join(
["{}".format(x) for x in executable_tis])
- self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
+ self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
# so these dont expire on commit
for ti in executable_tis:
copy_dag_id = ti.dag_id
@@ -1208,7 +1208,7 @@ class SchedulerJob(BaseJob):
.with_for_update()
.all())
if len(tis_to_set_to_queued) == 0:
- self.logger.info("No tasks were able to have their state changed to queued.")
+ self.log.info("No tasks were able to have their state changed to queued.")
session.commit()
return []
@@ -1236,7 +1236,7 @@ class SchedulerJob(BaseJob):
task_instance_str = "\n\t".join(
["{}".format(x) for x in tis_to_be_queued])
- self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
+ self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
return tis_to_be_queued
def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances):
@@ -1268,7 +1268,7 @@ class SchedulerJob(BaseJob):
priority = task_instance.priority_weight
queue = task_instance.queue
- self.logger.info(
+ self.log.info(
"Sending %s to executor with priority %s and queue %s",
task_instance.key, priority, queue
)
@@ -1357,18 +1357,18 @@ class SchedulerJob(BaseJob):
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
if dag.is_paused:
- self.logger.info("Not processing DAG %s since it's paused", dag.dag_id)
+ self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
continue
if not dag:
- self.logger.error("DAG ID %s was not found in the DagBag", dag.dag_id)
+ self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
continue
- self.logger.info("Processing %s", dag.dag_id)
+ self.log.info("Processing %s", dag.dag_id)
dag_run = self.create_dag_run(dag)
if dag_run:
- self.logger.info("Created %s", dag_run)
+ self.log.info("Created %s", dag_run)
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)
@@ -1384,7 +1384,7 @@ class SchedulerJob(BaseJob):
"""
for key, executor_state in list(self.executor.get_event_buffer().items()):
dag_id, task_id, execution_date = key
- self.logger.info(
+ self.log.info(
"Executor reports %s.%s execution_date=%s as %s",
dag_id, task_id, execution_date, executor_state
)
@@ -1453,10 +1453,10 @@ class SchedulerJob(BaseJob):
"\n" +
"=" * 80)
- self.logger.info(log_str)
+ self.log.info(log_str)
def _execute(self):
- self.logger.info("Starting the scheduler")
+ self.log.info("Starting the scheduler")
pessimistic_connection_handling()
# DAGs can be pickled for easier remote execution by some executors
@@ -1469,16 +1469,16 @@ class SchedulerJob(BaseJob):
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.
- self.logger.info("Processing files using up to %s processes at a time", self.max_threads)
- self.logger.info("Running execute loop for %s seconds", self.run_duration)
- self.logger.info("Processing each file at most %s times", self.num_runs)
- self.logger.info("Process each file at most once every %s seconds", self.file_process_interval)
- self.logger.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
+ self.log.info("Processing files using up to %s processes at a time", self.max_threads)
+ self.log.info("Running execute loop for %s seconds", self.run_duration)
+ self.log.info("Processing each file at most %s times", self.num_runs)
+ self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
+ self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
# Build up a list of Python files that could contain DAGs
- self.logger.info("Searching for files in %s", self.subdir)
+ self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
- self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir)
+ self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
def processor_factory(file_path, log_file_path):
return DagFileProcessor(file_path,
@@ -1497,7 +1497,7 @@ class SchedulerJob(BaseJob):
try:
self._execute_helper(processor_manager)
finally:
- self.logger.info("Exited execute loop")
+ self.log.info("Exited execute loop")
# Kill all child processes on exit since we don't want to leave
# them as orphaned.
@@ -1511,22 +1511,22 @@ class SchedulerJob(BaseJob):
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
for child in child_processes:
- self.logger.info("Terminating child PID: %s", child.pid)
+ self.log.info("Terminating child PID: %s", child.pid)
child.terminate()
# TODO: Remove magic number
timeout = 5
- self.logger.info("Waiting up to %s seconds for processes to exit...", timeout)
+ self.log.info("Waiting up to %s seconds for processes to exit...", timeout)
try:
psutil.wait_procs(child_processes, timeout)
except psutil.TimeoutExpired:
- self.logger.debug("Ran out of time while waiting for processes to exit")
+ self.log.debug("Ran out of time while waiting for processes to exit")
# Then SIGKILL
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
if len(child_processes) > 0:
for child in child_processes:
- self.logger.info("Killing child PID: %s", child.pid)
+ self.log.info("Killing child PID: %s", child.pid)
child.kill()
child.wait()
@@ -1539,7 +1539,7 @@ class SchedulerJob(BaseJob):
self.executor.start()
session = settings.Session()
- self.logger.info("Resetting orphaned tasks for active dag runs")
+ self.log.info("Resetting orphaned tasks for active dag runs")
self.reset_state_for_orphaned_tasks(session=session)
session.close()
@@ -1558,7 +1558,7 @@ class SchedulerJob(BaseJob):
# For the execute duration, parse and schedule DAGs
while (datetime.now() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
- self.logger.debug("Starting Loop...")
+ self.log.debug("Starting Loop...")
loop_start_time = time.time()
# Traverse the DAG directory for Python files containing DAGs
@@ -1568,23 +1568,23 @@ class SchedulerJob(BaseJob):
if elapsed_time_since_refresh > self.dag_dir_list_interval:
# Build up a list of Python files that could contain DAGs
- self.logger.info("Searching for files in %s", self.subdir)
+ self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = datetime.now()
- self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir)
+ self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
processor_manager.set_file_paths(known_file_paths)
- self.logger.debug("Removing old import errors")
+ self.log.debug("Removing old import errors")
self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)
# Kick of new processes and collect results from finished ones
- self.logger.info("Heartbeating the process manager")
+ self.log.info("Heartbeating the process manager")
simple_dags = processor_manager.heartbeat()
if self.using_sqlite:
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
- self.logger.debug("Waiting for processors to finish since we're using sqlite")
+ self.log.debug("Waiting for processors to finish since we're using sqlite")
processor_manager.wait_until_finished()
# Send tasks for execution if available
@@ -1613,7 +1613,7 @@ class SchedulerJob(BaseJob):
(State.SCHEDULED,))
# Call hearbeats
- self.logger.info("Heartbeating the executor")
+ self.log.info("Heartbeating the executor")
self.executor.heartbeat()
# Process events from the executor
@@ -1623,7 +1623,7 @@ class SchedulerJob(BaseJob):
time_since_last_heartbeat = (datetime.now() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
- self.logger.info("Heartbeating the scheduler")
+ self.log.info("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = datetime.now()
@@ -1636,13 +1636,13 @@ class SchedulerJob(BaseJob):
last_stat_print_time = datetime.now()
loop_end_time = time.time()
- self.logger.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
- self.logger.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
+ self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
+ self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)
# Exit early for a test mode
if processor_manager.max_runs_reached():
- self.logger.info("Exiting loop as all files have been processed %s times", self.num_runs)
+ self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
break
# Stop any processors
@@ -1657,7 +1657,7 @@ class SchedulerJob(BaseJob):
all_files_processed = False
break
if all_files_processed:
- self.logger.info(
+ self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
@@ -1693,21 +1693,21 @@ class SchedulerJob(BaseJob):
:return: a list of SimpleDags made from the Dags found in the file
:rtype: list[SimpleDag]
"""
- self.logger.info("Processing file %s for tasks to queue", file_path)
+ self.log.info("Processing file %s for tasks to queue", file_path)
# As DAGs are parsed from this file, they will be converted into SimpleDags
simple_dags = []
try:
dagbag = models.DagBag(file_path)
except Exception:
- self.logger.exception("Failed at reloading the DAG file %s", file_path)
+ self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr('dag_file_refresh_error', 1, 1)
return []
if len(dagbag.dags) > 0:
- self.logger.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
+ self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
else:
- self.logger.warning("No viable dags retrieved from %s", file_path)
+ self.log.warning("No viable dags retrieved from %s", file_path)
self.update_import_errors(session, dagbag)
return []
@@ -1777,7 +1777,7 @@ class SchedulerJob(BaseJob):
ti.state = State.SCHEDULED
# Also save this task instance to the DB.
- self.logger.info("Creating / updating %s in ORM", ti)
+ self.log.info("Creating / updating %s in ORM", ti)
session.merge(ti)
session.commit()
@@ -1785,11 +1785,11 @@ class SchedulerJob(BaseJob):
try:
self.update_import_errors(session, dagbag)
except Exception:
- self.logger.exception("Error logging import errors!")
+ self.log.exception("Error logging import errors!")
try:
dagbag.kill_zombies()
except Exception:
- self.logger.exception("Error killing zombies!")
+ self.log.exception("Error killing zombies!")
return simple_dags
@@ -1908,22 +1908,22 @@ class BackfillJob(BaseJob):
ti.refresh_from_db()
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
- self.logger.debug("Task instance %s succeeded. Don't rerun.", ti)
+ self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.started.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
- self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
+ self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.started.pop(key)
continue
elif ti.state == State.FAILED:
- self.logger.error("Task instance %s failed", ti)
+ self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.started.pop(key)
continue
# special case: if the task needs to run again put it back
elif ti.state == State.UP_FOR_RETRY:
- self.logger.warning("Task instance %s is up for retry", ti)
+ self.log.warning("Task instance %s is up for retry", ti)
ti_status.started.pop(key)
ti_status.to_run[key] = ti
# special case: The state of the task can be set to NONE by the task itself
@@ -1932,7 +1932,7 @@ class BackfillJob(BaseJob):
# for that as otherwise those tasks would fall outside of the scope of
# the backfill suddenly.
elif ti.state == State.NONE:
- self.logger.warning(
+ self.log.warning(
"FIXME: task instance %s state was set to none externally or "
"reaching concurrency limits. Re-adding task to queue.",
ti
@@ -1953,7 +1953,7 @@ class BackfillJob(BaseJob):
for key, state in list(executor.get_event_buffer().items()):
if key not in started:
- self.logger.warning(
+ self.log.warning(
"%s state %s not in started=%s",
key, state, started.values()
)
@@ -1962,14 +1962,14 @@ class BackfillJob(BaseJob):
ti = started[key]
ti.refresh_from_db()
- self.logger.debug("Executor state: %s task %s", state, ti)
+ self.log.debug("Executor state: %s task %s", state, ti)
if state == State.FAILED or state == State.SUCCESS:
if ti.state == State.RUNNING or ti.state == State.QUEUED:
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
- self.logger.error(msg)
+ self.log.error(msg)
ti.handle_failure(msg)
@provide_session
@@ -2083,9 +2083,9 @@ class BackfillJob(BaseJob):
len(ti_status.skipped),
len(ti_status.deadlocked),
len(ti_status.not_ready))
- self.logger.info(msg)
+ self.log.info(msg)
- self.logger.debug(
+ self.log.debug(
"Finished dag run loop iteration. Remaining tasks %s",
ti_status.to_run.values()
)
@@ -2118,7 +2118,7 @@ class BackfillJob(BaseJob):
while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
len(ti_status.deadlocked) == 0):
- self.logger.debug("*** Clearing out not_ready list ***")
+ self.log.debug("*** Clearing out not_ready list ***")
ti_status.not_ready.clear()
# we need to execute the tasks bottom to top
@@ -2138,12 +2138,12 @@ class BackfillJob(BaseJob):
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))
- self.logger.debug("Task instance to run %s state %s", ti, ti.state)
+ self.log.debug("Task instance to run %s state %s", ti, ti.state)
# guard against externally modified tasks instances or
# in case max concurrency has been reached at task runtime
if ti.state == State.NONE:
- self.logger.warning(
+ self.log.warning(
"FIXME: task instance {} state was set to None externally. This should not happen"
)
ti.set_state(State.SCHEDULED, session=session)
@@ -2152,27 +2152,27 @@ class BackfillJob(BaseJob):
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
- self.logger.debug("Task instance %s succeeded. Don't rerun.", ti)
+ self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
- self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
+ self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
elif ti.state == State.FAILED:
- self.logger.error("Task instance %s failed", ti)
+ self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
continue
elif ti.state == State.UPSTREAM_FAILED:
- self.logger.error("Task instance %s upstream failed", ti)
+ self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
@@ -2194,12 +2194,12 @@ class BackfillJob(BaseJob):
ti.refresh_from_db(lock_for_update=True, session=session)
if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
if executor.has_task(ti):
- self.logger.debug(
+ self.log.debug(
"Task Instance %s already in executor waiting for queue to clear",
ti
)
else:
- self.logger.debug('Sending %s to executor', ti)
+ self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
session.merge(ti)
@@ -2216,7 +2216,7 @@ class BackfillJob(BaseJob):
continue
if ti.state == State.UPSTREAM_FAILED:
- self.logger.error("Task instance %s upstream failed", ti)
+ self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
@@ -2225,14 +2225,14 @@ class BackfillJob(BaseJob):
# special case
if ti.state == State.UP_FOR_RETRY:
- self.logger.debug("Task instance %s retry period not expired yet", ti)
+ self.log.debug("Task instance %s retry period not expired yet", ti)
if key in ti_status.started:
ti_status.started.pop(key)
ti_status.to_run[key] = ti
continue
# all remaining tasks
- self.logger.debug('Adding %s to not_ready', ti)
+ self.log.debug('Adding %s to not_ready', ti)
ti_status.not_ready.add(key)
# execute the tasks in the queue
@@ -2245,7 +2245,7 @@ class BackfillJob(BaseJob):
if (ti_status.not_ready and
ti_status.not_ready == set(ti_status.to_run) and
len(ti_status.started) == 0):
- self.logger.warning(
+ self.log.warning(
"Deadlock discovered for ti_status.to_run=%s",
ti_status.to_run.values()
)
@@ -2364,7 +2364,7 @@ class BackfillJob(BaseJob):
run_dates = self.dag.get_run_dates(start_date=start_date,
end_date=self.bf_end_date)
if len(run_dates) == 0:
- self.logger.info("No run dates were found for the given dates and dag interval.")
+ self.log.info("No run dates were found for the given dates and dag interval.")
return
# picklin'
@@ -2402,7 +2402,7 @@ class BackfillJob(BaseJob):
raise AirflowException(err)
if remaining_dates > 0:
- self.logger.info(
+ self.log.info(
"max_active_runs limit for dag %s has been reached "
" - waiting for other dag runs to finish",
self.dag_id
@@ -2413,7 +2413,7 @@ class BackfillJob(BaseJob):
session.commit()
session.close()
- self.logger.info("Backfill done. Exiting.")
+ self.log.info("Backfill done. Exiting.")
class LocalTaskJob(BaseJob):
@@ -2453,7 +2453,7 @@ class LocalTaskJob(BaseJob):
def signal_handler(signum, frame):
"""Setting kill signal handler"""
- self.logger.error("Killing subprocess")
+ self.log.error("Killing subprocess")
self.on_kill()
raise AirflowException("LocalTaskJob received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
@@ -2466,7 +2466,7 @@ class LocalTaskJob(BaseJob):
ignore_ti_state=self.ignore_ti_state,
job_id=self.id,
pool=self.pool):
- self.logger.info("Task is not able to be run")
+ self.log.info("Task is not able to be run")
return
try:
@@ -2479,7 +2479,7 @@ class LocalTaskJob(BaseJob):
# Monitor the task to see if it's done
return_code = self.task_runner.return_code()
if return_code is not None:
- self.logger.info("Task exited with return code %s", return_code)
+ self.log.info("Task exited with return code %s", return_code)
return
# Periodically heartbeat so that the scheduler doesn't think this
@@ -2489,7 +2489,7 @@ class LocalTaskJob(BaseJob):
last_heartbeat_time = time.time()
except OperationalError:
Stats.incr('local_task_job_heartbeat_failure', 1, 1)
- self.logger.exception(
+ self.log.exception(
"Exception while trying to heartbeat! Sleeping for %s seconds",
self.heartrate
)
@@ -2500,7 +2500,7 @@ class LocalTaskJob(BaseJob):
time_since_last_heartbeat = time.time() - last_heartbeat_time
if time_since_last_heartbeat > heartbeat_time_limit:
Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
- self.logger.error("Heartbeat time limited exceeded!")
+ self.log.error("Heartbeat time limited exceeded!")
raise AirflowException("Time since last heartbeat({:.2f}s) "
"exceeded limit ({}s)."
.format(time_since_last_heartbeat,
@@ -2530,18 +2530,18 @@ class LocalTaskJob(BaseJob):
if ti.state == State.RUNNING:
if not same_hostname:
- self.logger.warning("The recorded hostname {ti.hostname} "
+ self.log.warning("The recorded hostname {ti.hostname} "
"does not match this instance's hostname "
"{fqdn}".format(**locals()))
raise AirflowException("Hostname of job runner does not match")
elif not same_process:
current_pid = os.getpid()
- self.logger.warning("Recorded pid {ti.pid} does not match the current pid "
+ self.log.warning("Recorded pid {ti.pid} does not match the current pid "
"{current_pid}".format(**locals()))
raise AirflowException("PID of job runner does not match")
elif (self.task_runner.return_code() is None
and hasattr(self.task_runner, 'process')):
- self.logger.warning(
+ self.log.warning(
"State of this instance has been externally set to %s. Taking the poison pill.",
ti.state
)
[4/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to
log
Posted by bo...@apache.org.
[AIRFLOW-1604] Rename logger to log
In all the popular languages the variable name log
is the de facto
standard for the logging. Rename LoggingMixin.py
to logging_mixin.py
to comply with the Python standard.
When using the .logger a deprecation warning will
be emitted.
Closes #2604 from Fokko/AIRFLOW-1604-logger-to-log
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb2f5890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb2f5890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb2f5890
Branch: refs/heads/master
Commit: eb2f589099b87743482c2eb16261b49e284dcd96
Parents: 8e253c7
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Tue Sep 19 10:17:14 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Sep 19 10:17:14 2017 +0200
----------------------------------------------------------------------
airflow/__init__.py | 4 +-
airflow/api/__init__.py | 4 +-
airflow/api/auth/backend/kerberos_auth.py | 4 +-
airflow/bin/cli.py | 10 +-
airflow/configuration.py | 4 +-
.../auth/backends/github_enterprise_auth.py | 4 +-
airflow/contrib/auth/backends/google_auth.py | 4 +-
airflow/contrib/auth/backends/kerberos_auth.py | 2 +-
airflow/contrib/auth/backends/ldap_auth.py | 4 +-
airflow/contrib/auth/backends/password_auth.py | 4 +-
airflow/contrib/executors/mesos_executor.py | 34 +--
airflow/contrib/hooks/bigquery_hook.py | 24 +--
airflow/contrib/hooks/cloudant_hook.py | 4 +-
airflow/contrib/hooks/databricks_hook.py | 8 +-
airflow/contrib/hooks/datadog_hook.py | 6 +-
airflow/contrib/hooks/datastore_hook.py | 4 +-
airflow/contrib/hooks/ftp_hook.py | 6 +-
airflow/contrib/hooks/gcp_api_base_hook.py | 6 +-
airflow/contrib/hooks/gcp_dataflow_hook.py | 14 +-
airflow/contrib/hooks/gcp_dataproc_hook.py | 18 +-
airflow/contrib/hooks/gcp_mlengine_hook.py | 18 +-
airflow/contrib/hooks/gcs_hook.py | 4 +-
airflow/contrib/hooks/jira_hook.py | 4 +-
airflow/contrib/hooks/qubole_hook.py | 12 +-
airflow/contrib/hooks/redis_hook.py | 6 +-
airflow/contrib/hooks/salesforce_hook.py | 14 +-
airflow/contrib/hooks/spark_sql_hook.py | 6 +-
airflow/contrib/hooks/spark_submit_hook.py | 14 +-
airflow/contrib/hooks/sqoop_hook.py | 10 +-
airflow/contrib/hooks/ssh_hook.py | 14 +-
airflow/contrib/operators/bigquery_operator.py | 2 +-
.../operators/bigquery_table_delete_operator.py | 2 +-
.../contrib/operators/bigquery_to_bigquery.py | 2 +-
airflow/contrib/operators/bigquery_to_gcs.py | 6 +-
.../contrib/operators/databricks_operator.py | 12 +-
airflow/contrib/operators/dataproc_operator.py | 16 +-
.../operators/datastore_export_operator.py | 2 +-
.../operators/datastore_import_operator.py | 2 +-
airflow/contrib/operators/ecs_operator.py | 12 +-
.../contrib/operators/emr_add_steps_operator.py | 4 +-
.../operators/emr_create_job_flow_operator.py | 4 +-
.../emr_terminate_job_flow_operator.py | 4 +-
airflow/contrib/operators/file_to_wasb.py | 2 +-
airflow/contrib/operators/fs_operator.py | 2 +-
.../contrib/operators/gcs_download_operator.py | 4 +-
airflow/contrib/operators/gcs_to_bq.py | 2 +-
airflow/contrib/operators/hipchat_operator.py | 4 +-
airflow/contrib/operators/mlengine_operator.py | 14 +-
airflow/contrib/operators/mysql_to_gcs.py | 2 +-
airflow/contrib/operators/sftp_operator.py | 4 +-
airflow/contrib/operators/vertica_operator.py | 2 +-
airflow/contrib/operators/vertica_to_hive.py | 4 +-
airflow/contrib/sensors/bigquery_sensor.py | 2 +-
airflow/contrib/sensors/datadog_sensor.py | 2 +-
airflow/contrib/sensors/emr_base_sensor.py | 4 +-
airflow/contrib/sensors/emr_job_flow_sensor.py | 2 +-
airflow/contrib/sensors/emr_step_sensor.py | 2 +-
airflow/contrib/sensors/ftp_sensor.py | 2 +-
airflow/contrib/sensors/gcs_sensor.py | 4 +-
airflow/contrib/sensors/hdfs_sensors.py | 6 +-
airflow/contrib/sensors/jira_sensor.py | 14 +-
airflow/contrib/sensors/redis_key_sensor.py | 2 +-
airflow/contrib/sensors/wasb_sensor.py | 4 +-
.../contrib/task_runner/cgroup_task_runner.py | 20 +-
airflow/executors/__init__.py | 4 +-
airflow/executors/base_executor.py | 14 +-
airflow/executors/celery_executor.py | 16 +-
airflow/executors/dask_executor.py | 4 +-
airflow/executors/local_executor.py | 6 +-
airflow/executors/sequential_executor.py | 4 +-
airflow/hooks/S3_hook.py | 10 +-
airflow/hooks/base_hook.py | 4 +-
airflow/hooks/dbapi_hook.py | 6 +-
airflow/hooks/druid_hook.py | 4 +-
airflow/hooks/hive_hooks.py | 26 +--
airflow/hooks/http_hook.py | 6 +-
airflow/hooks/oracle_hook.py | 8 +-
airflow/hooks/pig_hook.py | 4 +-
airflow/hooks/webhdfs_hook.py | 12 +-
airflow/hooks/zendesk_hook.py | 4 +-
airflow/jobs.py | 206 +++++++++----------
airflow/models.py | 126 ++++++------
airflow/operators/bash_operator.py | 14 +-
airflow/operators/check_operator.py | 20 +-
airflow/operators/dagrun_operator.py | 4 +-
airflow/operators/docker_operator.py | 10 +-
airflow/operators/generic_transfer.py | 10 +-
airflow/operators/hive_operator.py | 2 +-
airflow/operators/hive_stats_operator.py | 8 +-
airflow/operators/hive_to_druid.py | 10 +-
airflow/operators/hive_to_mysql.py | 10 +-
airflow/operators/hive_to_samba_operator.py | 4 +-
airflow/operators/http_operator.py | 2 +-
airflow/operators/jdbc_operator.py | 2 +-
airflow/operators/latest_only_operator.py | 12 +-
airflow/operators/mssql_operator.py | 2 +-
airflow/operators/mssql_to_hive.py | 4 +-
airflow/operators/mysql_operator.py | 2 +-
airflow/operators/mysql_to_hive.py | 4 +-
airflow/operators/oracle_operator.py | 2 +-
airflow/operators/pig_operator.py | 2 +-
airflow/operators/postgres_operator.py | 2 +-
airflow/operators/presto_to_mysql.py | 8 +-
airflow/operators/python_operator.py | 36 ++--
airflow/operators/redshift_to_s3_operator.py | 6 +-
airflow/operators/s3_file_transform_operator.py | 12 +-
airflow/operators/s3_to_hive_operator.py | 24 +--
airflow/operators/sensors.py | 34 +--
airflow/operators/slack_operator.py | 2 +-
airflow/operators/sqlite_operator.py | 2 +-
airflow/plugins_manager.py | 4 +-
airflow/security/kerberos.py | 2 +-
airflow/settings.py | 4 +-
airflow/task_runner/base_task_runner.py | 10 +-
airflow/task_runner/bash_task_runner.py | 2 +-
airflow/utils/dag_processing.py | 18 +-
airflow/utils/db.py | 4 +-
airflow/utils/email.py | 4 +-
airflow/utils/log/LoggingMixin.py | 45 ----
airflow/utils/log/gcs_task_handler.py | 8 +-
airflow/utils/log/logging_mixin.py | 61 ++++++
airflow/utils/log/s3_task_handler.py | 8 +-
airflow/utils/timeout.py | 12 +-
airflow/www/api/experimental/endpoints.py | 4 +-
airflow/www/app.py | 2 +-
scripts/perf/scheduler_ops_metrics.py | 4 +-
tests/contrib/hooks/test_databricks_hook.py | 2 +-
.../contrib/operators/test_dataproc_operator.py | 8 +-
tests/contrib/sensors/test_hdfs_sensors.py | 62 +++---
tests/executors/test_executor.py | 4 +-
tests/operators/sensors.py | 4 +-
tests/test_utils/reset_warning_registry.py | 82 ++++++++
tests/utils/log/test_logging.py | 6 +-
tests/utils/test_logging_mixin.py | 50 +++++
134 files changed, 858 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 8844eeb..3c5f24c 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -21,7 +21,7 @@ in their PYTHONPATH. airflow_login should be based off the
"""
from builtins import object
from airflow import version
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
__version__ = version.version
@@ -41,7 +41,7 @@ login = None
def load_login():
- log = LoggingMixin().logger
+ log = LoggingMixin().log
auth_backend = 'airflow.default_login'
try:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 39edbed..31a303b 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -17,11 +17,11 @@ from airflow.exceptions import AirflowException
from airflow import configuration as conf
from importlib import import_module
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
api_auth = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
def load_auth():
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/auth/backend/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index 73a5aa2..a904d59 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -24,7 +24,7 @@
from future.standard_library import install_aliases
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
install_aliases()
@@ -47,7 +47,7 @@ client_auth = HTTPKerberosAuth(service='airflow')
_SERVICE_NAME = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
def init_app(app):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 56f1855..5035a66 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,7 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.www.app import cached_app
from sqlalchemy import func
@@ -64,7 +64,7 @@ api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.api_auth.client_auth)
-log = LoggingMixin().logger
+log = LoggingMixin().log
def sigint_handler(sig, frame):
@@ -189,7 +189,7 @@ def trigger_dag(args):
:param args:
:return:
"""
- log = LoggingMixin().logger
+ log = LoggingMixin().log
try:
message = api_client.trigger_dag(dag_id=args.dag_id,
run_id=args.run_id,
@@ -202,7 +202,7 @@ def trigger_dag(args):
def pool(args):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
def _tabulate(pools):
return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
@@ -330,7 +330,7 @@ def run(args, dag=None):
if dag:
args.dag_id = dag.dag_id
- log = LoggingMixin().logger
+ log = LoggingMixin().log
# Load custom airflow config
if args.cfg_path:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index db196f9..ff81d98 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,7 +28,7 @@ import sys
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
@@ -38,7 +38,7 @@ from six.moves import configparser
from airflow.exceptions import AirflowConfigException
-log = LoggingMixin().logger
+log = LoggingMixin().log
# show Airflow's deprecation warnings
warnings.filterwarnings(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 459e9c9..28c3cfc 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -27,9 +27,9 @@ from flask_oauthlib.client import OAuth
from airflow import models, configuration, settings
from airflow.configuration import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def get_config_param(param):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index f38f725..e6eab94 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -26,9 +26,9 @@ from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
from airflow import models, configuration, settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def get_config_param(param):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index ffb711f..908ebc9 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -29,7 +29,7 @@ from flask import url_for, redirect
from airflow import settings
from airflow import models
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 8ce0875..b056851 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -33,13 +33,13 @@ from airflow.configuration import AirflowConfigException
import traceback
import re
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
class AuthenticationError(Exception):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/password_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index 3ad2a8b..8adb1f4 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -32,13 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
from airflow import settings
from airflow import models
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
PY3 = version_info[0] == 3
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index 19d72ed..8728566 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -14,7 +14,7 @@
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.www.utils import LoginMixin
standard_library.install_aliases()
@@ -65,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
self.task_key_map = {}
def registered(self, driver, frameworkId, masterInfo):
- self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
+ self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'):
# Import here to work around a circular import error
@@ -86,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
Session.remove()
def reregistered(self, driver, masterInfo):
- self.logger.info("AirflowScheduler re-registered to mesos")
+ self.log.info("AirflowScheduler re-registered to mesos")
def disconnected(self, driver):
- self.logger.info("AirflowScheduler disconnected from mesos")
+ self.log.info("AirflowScheduler disconnected from mesos")
def offerRescinded(self, driver, offerId):
- self.logger.info("AirflowScheduler offer %s rescinded", str(offerId))
+ self.log.info("AirflowScheduler offer %s rescinded", str(offerId))
def frameworkMessage(self, driver, executorId, slaveId, message):
- self.logger.info("AirflowScheduler received framework message %s", message)
+ self.log.info("AirflowScheduler received framework message %s", message)
def executorLost(self, driver, executorId, slaveId, status):
- self.logger.warning("AirflowScheduler executor %s lost", str(executorId))
+ self.log.warning("AirflowScheduler executor %s lost", str(executorId))
def slaveLost(self, driver, slaveId):
- self.logger.warning("AirflowScheduler slave %s lost", str(slaveId))
+ self.log.warning("AirflowScheduler slave %s lost", str(slaveId))
def error(self, driver, message):
- self.logger.error("AirflowScheduler driver aborted %s", message)
+ self.log.error("AirflowScheduler driver aborted %s", message)
raise AirflowException("AirflowScheduler driver aborted %s" % message)
def resourceOffers(self, driver, offers):
@@ -118,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
elif resource.name == "mem":
offerMem += resource.scalar.value
- self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
+ self.log.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
remainingCpus = offerCpus
remainingMem = offerMem
@@ -131,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
self.task_counter += 1
self.task_key_map[str(tid)] = key
- self.logger.info("Launching task %d using offer %s", tid, offer.id.value)
+ self.log.info("Launching task %d using offer %s", tid, offer.id.value)
task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
@@ -161,7 +161,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
driver.launchTasks(offer.id, tasks)
def statusUpdate(self, driver, update):
- self.logger.info(
+ self.log.info(
"Task %s is in state %s, data %s",
update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)
)
@@ -171,7 +171,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
except KeyError:
# The map may not contain an item if the framework re-registered after a failover.
# Discard these tasks.
- self.logger.warning("Unrecognised task key %s", update.task_id.value)
+ self.log.warning("Unrecognised task key %s", update.task_id.value)
return
if update.state == mesos_pb2.TASK_FINISHED:
@@ -203,7 +203,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
framework.user = ''
if not configuration.get('mesos', 'MASTER'):
- self.logger.error("Expecting mesos master URL for mesos executor")
+ self.log.error("Expecting mesos master URL for mesos executor")
raise AirflowException("mesos.master not provided for mesos executor")
master = configuration.get('mesos', 'MASTER')
@@ -239,7 +239,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
else:
framework.checkpoint = False
- self.logger.info(
+ self.log.info(
'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)
)
@@ -248,10 +248,10 @@ class MesosExecutor(BaseExecutor, LoginMixin):
if configuration.getboolean('mesos', 'AUTHENTICATE'):
if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
- self.logger.error("Expecting authentication principal in the environment")
+ self.log.error("Expecting authentication principal in the environment")
raise AirflowException("mesos.default_principal not provided in authenticated mode")
if not configuration.get('mesos', 'DEFAULT_SECRET'):
- self.logger.error("Expecting authentication secret in the environment")
+ self.log.error("Expecting authentication secret in the environment")
raise AirflowException("mesos.default_secret not provided in authenticated mode")
credential = mesos_pb2.Credential()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 497fa28..5fc7e22 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -32,7 +32,7 @@ from past.builtins import basestring
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
@@ -499,7 +499,7 @@ class BigQueryBaseCursor(LoggingMixin):
"'WRITE_APPEND' or 'WRITE_TRUNCATE'."
)
else:
- self.logger.info(
+ self.log.info(
"Adding experimental "
"'schemaUpdateOptions': {0}".format(schema_update_options)
)
@@ -576,12 +576,12 @@ class BigQueryBaseCursor(LoggingMixin):
)
)
else:
- self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+ self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
time.sleep(5)
except HttpError as err:
if err.resp.status in [500, 503]:
- self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+ self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
time.sleep(5)
else:
raise Exception(
@@ -660,14 +660,14 @@ class BigQueryBaseCursor(LoggingMixin):
datasetId=deletion_dataset,
tableId=deletion_table) \
.execute()
- self.logger.info('Deleted table %s:%s.%s.',
- deletion_project, deletion_dataset, deletion_table)
+ self.log.info('Deleted table %s:%s.%s.',
+ deletion_project, deletion_dataset, deletion_table)
except HttpError:
if not ignore_if_missing:
raise Exception(
'Table deletion failed. Table does not exist.')
else:
- self.logger.info('Table does not exist. Skipping.')
+ self.log.info('Table does not exist. Skipping.')
def run_table_upsert(self, dataset_id, table_resource, project_id=None):
@@ -694,7 +694,7 @@ class BigQueryBaseCursor(LoggingMixin):
for table in tables_list_resp.get('tables', []):
if table['tableReference']['tableId'] == table_id:
# found the table, do update
- self.logger.info(
+ self.log.info(
'Table %s:%s.%s exists, updating.',
project_id, dataset_id, table_id
)
@@ -712,7 +712,7 @@ class BigQueryBaseCursor(LoggingMixin):
# If there is no next page, then the table doesn't exist.
else:
# do insert
- self.logger.info(
+ self.log.info(
'Table %s:%s.%s does not exist. creating.',
project_id, dataset_id, table_id
)
@@ -759,7 +759,7 @@ class BigQueryBaseCursor(LoggingMixin):
'tableId': view_table}}
# check to see if the view we want to add already exists.
if view_access not in access:
- self.logger.info(
+ self.log.info(
'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table, source_project, source_dataset
)
@@ -769,7 +769,7 @@ class BigQueryBaseCursor(LoggingMixin):
body={'access': access}).execute()
else:
# if view is already in access, do nothing.
- self.logger.info(
+ self.log.info(
'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table, source_project, source_dataset
)
@@ -1032,7 +1032,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):
if project_id is None:
if var_name is not None:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.info(
'Project not included in {var}: {input}; using project "{project}"'.format(
var=var_name, input=table_input, project=default_project_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/cloudant_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index d9db08d..cbb0cca 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -18,7 +18,7 @@ import cloudant
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class CloudantHook(BaseHook):
@@ -35,7 +35,7 @@ class CloudantHook(BaseHook):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.debug(
'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
s
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/databricks_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py
index 7b20433..cd9dc54 100644
--- a/airflow/contrib/hooks/databricks_hook.py
+++ b/airflow/contrib/hooks/databricks_hook.py
@@ -20,7 +20,7 @@ from airflow.hooks.base_hook import BaseHook
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
try:
from urllib import parse as urlparse
@@ -100,10 +100,10 @@ class DatabricksHook(BaseHook, LoggingMixin):
host=self._parse_host(self.databricks_conn.host),
endpoint=endpoint)
if 'token' in self.databricks_conn.extra_dejson:
- self.logger.info('Using token auth.')
+ self.log.info('Using token auth.')
auth = _TokenAuth(self.databricks_conn.extra_dejson['token'])
else:
- self.logger.info('Using basic auth.')
+ self.log.info('Using basic auth.')
auth = (self.databricks_conn.login, self.databricks_conn.password)
if method == 'GET':
request_func = requests.get
@@ -129,7 +129,7 @@ class DatabricksHook(BaseHook, LoggingMixin):
response.content, response.status_code))
except (requests_exceptions.ConnectionError,
requests_exceptions.Timeout) as e:
- self.logger.error(
+ self.log.error(
'Attempt %s API Request to Databricks failed with reason: %s',
attempt_num, e
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
index 0f5af00..6caf611 100644
--- a/airflow/contrib/hooks/datadog_hook.py
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -17,7 +17,7 @@ from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from datadog import initialize, api
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class DatadogHook(BaseHook, LoggingMixin):
@@ -47,7 +47,7 @@ class DatadogHook(BaseHook, LoggingMixin):
if self.app_key is None:
raise AirflowException("app_key must be specified in the Datadog connection details")
- self.logger.info("Setting up api keys for Datadog")
+ self.log.info("Setting up api keys for Datadog")
options = {
'api_key': self.api_key,
'app_key': self.app_key
@@ -56,7 +56,7 @@ class DatadogHook(BaseHook, LoggingMixin):
def validate_response(self, response):
if response['status'] != 'ok':
- self.logger.error("Datadog returned: %s", response)
+ self.log.error("Datadog returned: %s", response)
raise AirflowException("Error status received from Datadog")
def send_metric(self, metric_name, datapoint, tags=None):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datastore_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py
index 2ff1600..cf98dc7 100644
--- a/airflow/contrib/hooks/datastore_hook.py
+++ b/airflow/contrib/hooks/datastore_hook.py
@@ -136,8 +136,8 @@ class DatastoreHook(GoogleCloudBaseHook):
result = self.get_operation(name)
state = result['metadata']['common']['state']
if state == 'PROCESSING':
- self.logger.info('Operation is processing. Re-polling state in {} seconds'
- .format(polling_interval_in_seconds))
+ self.log.info('Operation is processing. Re-polling state in {} seconds'
+ .format(polling_interval_in_seconds))
time.sleep(polling_interval_in_seconds)
else:
return result
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py
index a6b3181..b1e224d 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -19,7 +19,7 @@ import os.path
from airflow.hooks.base_hook import BaseHook
from past.builtins import basestring
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
def mlsd(conn, path="", facts=None):
@@ -167,9 +167,9 @@ class FTPHook(BaseHook, LoggingMixin):
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
- self.logger.info('Retrieving file from FTP: %s', remote_full_path)
+ self.log.info('Retrieving file from FTP: %s', remote_full_path)
conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
- self.logger.info('Finished retrieving file from FTP: %s', remote_full_path)
+ self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
if is_path:
output_handle.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_api_base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 7476c90..28721d3 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -18,7 +18,7 @@ from oauth2client.service_account import ServiceAccountCredentials
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class GoogleCloudBaseHook(BaseHook, LoggingMixin):
@@ -66,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
kwargs['sub'] = self.delegate_to
if not key_path:
- self.logger.info('Getting connection using `gcloud auth` user, since no key file '
+ self.log.info('Getting connection using `gcloud auth` user, since no key file '
'is defined for hook.')
credentials = GoogleCredentials.get_application_default()
else:
@@ -74,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
raise AirflowException('Scope should be defined when using a key file.')
scopes = [s.strip() for s in scope.split(',')]
if key_path.endswith('.json'):
- self.logger.info('Getting connection using a JSON key file.')
+ self.log.info('Getting connection using a JSON key file.')
credentials = ServiceAccountCredentials\
.from_json_keyfile_name(key_path, scopes)
elif key_path.endswith('.p12'):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index f5767bd..b1a1e0e 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -19,7 +19,7 @@ import uuid
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class _DataflowJob(LoggingMixin):
@@ -48,12 +48,12 @@ class _DataflowJob(LoggingMixin):
job = self._dataflow.projects().jobs().get(projectId=self._project_number,
jobId=self._job_id).execute()
if 'currentState' in job:
- self.logger.info(
+ self.log.info(
'Google Cloud DataFlow job %s is %s',
job['name'], job['currentState']
)
else:
- self.logger.info(
+ self.log.info(
'Google Cloud DataFlow with job_id %s has name %s',
self._job_id, job['name']
)
@@ -75,7 +75,7 @@ class _DataflowJob(LoggingMixin):
elif 'JOB_STATE_PENDING' == self._job['currentState']:
time.sleep(15)
else:
- self.logger.debug(str(self._job))
+ self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
@@ -109,15 +109,15 @@ class _Dataflow(LoggingMixin):
def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
- self.logger.info("Start waiting for DataFlow process to complete.")
+ self.log.info("Start waiting for DataFlow process to complete.")
while self._proc.poll() is None:
ret = select.select(reads, [], [], 5)
if ret is not None:
for fd in ret[0]:
line = self._line(fd)
- self.logger.debug(line[:-1])
+ self.log.debug(line[:-1])
else:
- self.logger.info("Waiting for DataFlow process to complete.")
+ self.log.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 3a1336e..c964f4c 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -18,7 +18,7 @@ import uuid
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class _DataProcJob(LoggingMixin):
@@ -30,7 +30,7 @@ class _DataProcJob(LoggingMixin):
region='global',
body=job).execute()
self.job_id = self.job['reference']['jobId']
- self.logger.info(
+ self.log.info(
'DataProc job %s is %s',
self.job_id, str(self.job['status']['state'])
)
@@ -43,20 +43,20 @@ class _DataProcJob(LoggingMixin):
jobId=self.job_id).execute()
if 'ERROR' == self.job['status']['state']:
print(str(self.job))
- self.logger.error('DataProc job %s has errors', self.job_id)
- self.logger.error(self.job['status']['details'])
- self.logger.debug(str(self.job))
+ self.log.error('DataProc job %s has errors', self.job_id)
+ self.log.error(self.job['status']['details'])
+ self.log.debug(str(self.job))
return False
if 'CANCELLED' == self.job['status']['state']:
print(str(self.job))
- self.logger.warning('DataProc job %s is cancelled', self.job_id)
+ self.log.warning('DataProc job %s is cancelled', self.job_id)
if 'details' in self.job['status']:
- self.logger.warning(self.job['status']['details'])
- self.logger.debug(str(self.job))
+ self.log.warning(self.job['status']['details'])
+ self.log.debug(str(self.job))
return False
if 'DONE' == self.job['status']['state']:
return True
- self.logger.debug(
+ self.log.debug(
'DataProc job %s is %s',
self.job_id, str(self.job['status']['state'])
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_mlengine_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 35f31a7..c17b614 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -20,11 +20,11 @@ from apiclient.discovery import build
from oauth2client.client import GoogleCredentials
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
for i in range(0, max_n):
try:
@@ -103,18 +103,18 @@ class MLEngineHook(GoogleCloudBaseHook):
if use_existing_job_fn is not None:
existing_job = self._get_job(project_id, job_id)
if not use_existing_job_fn(existing_job):
- self.logger.error(
+ self.log.error(
'Job with job_id %s already exist, but it does '
'not match our expectation: %s',
job_id, existing_job
)
raise
- self.logger.info(
+ self.log.info(
'Job with job_id %s already exist. Will waiting for it to finish',
job_id
)
else:
- self.logger.error('Failed to create MLEngine job: {}'.format(e))
+ self.log.error('Failed to create MLEngine job: {}'.format(e))
raise
return self._wait_for_job_done(project_id, job_id)
@@ -139,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook):
# polling after 30 seconds when quota failure occurs
time.sleep(30)
else:
- self.logger.error('Failed to get MLEngine job: {}'.format(e))
+ self.log.error('Failed to get MLEngine job: {}'.format(e))
raise
def _wait_for_job_done(self, project_id, job_id, interval=30):
@@ -191,10 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook):
try:
response = request.execute()
- self.logger.info('Successfully set version: %s to default', response)
+ self.log.info('Successfully set version: %s to default', response)
return response
except errors.HttpError as e:
- self.logger.error('Something went wrong: %s', e)
+ self.log.error('Something went wrong: %s', e)
raise
def list_versions(self, project_id, model_name):
@@ -262,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook):
return request.execute()
except errors.HttpError as e:
if e.resp.status == 404:
- self.logger.error('Model was not found: %s', e)
+ self.log.error('Model was not found: %s', e)
return None
raise
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index eb17c3b..24c247e 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -182,7 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
updated = dateutil.parser.parse(response['updated'])
- self.logger.info("Verify object date: %s > %s", updated, ts)
+ self.log.info("Verify object date: %s > %s", updated, ts)
if updated > ts:
return True
@@ -247,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
).execute()
if 'items' not in response:
- self.logger.info("No items found for prefix: %s", prefix)
+ self.log.info("No items found for prefix: %s", prefix)
break
for item in response['items']:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
index 8702608..21e669f 100644
--- a/airflow/contrib/hooks/jira_hook.py
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -16,7 +16,7 @@ from jira.exceptions import JIRAError
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class JiraHook(BaseHook, LoggingMixin):
@@ -35,7 +35,7 @@ class JiraHook(BaseHook, LoggingMixin):
def get_conn(self):
if not self.client:
- self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
+ self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
get_server_info = True
validate = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 1a5e7ec..833c1c7 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -21,7 +21,7 @@ import six
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from qds_sdk.qubole import Qubole
@@ -86,7 +86,7 @@ class QuboleHook(BaseHook, LoggingMixin):
if cmd_id is not None:
cmd = Command.find(cmd_id)
if cmd is not None:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
if cmd.status == 'done':
log.info('Command ID: %s has been succeeded, hence marking this '
'TI as Success.', cmd_id)
@@ -99,7 +99,7 @@ class QuboleHook(BaseHook, LoggingMixin):
args = self.cls.parse(self.create_cmd_args(context))
self.cmd = self.cls.create(**args)
context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
- self.logger.info(
+ self.log.info(
"Qubole command created with Id: %s and Status: %s",
self.cmd.id, self.cmd.status
)
@@ -107,10 +107,10 @@ class QuboleHook(BaseHook, LoggingMixin):
while not Command.is_done(self.cmd.status):
time.sleep(Qubole.poll_interval)
self.cmd = self.cls.find(self.cmd.id)
- self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
+ self.log.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
- self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
+ self.log.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
if self.cmd.status != 'done':
raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
@@ -126,7 +126,7 @@ class QuboleHook(BaseHook, LoggingMixin):
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
self.cmd = self.cls.find(cmd_id)
if self.cls and self.cmd:
- self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
+ self.log.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
self.cmd.cancel()
def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
index a8999d6..278e196 100644
--- a/airflow/contrib/hooks/redis_hook.py
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -19,7 +19,7 @@ from redis import StrictRedis
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class RedisHook(BaseHook, LoggingMixin):
@@ -41,7 +41,7 @@ class RedisHook(BaseHook, LoggingMixin):
self.password = conn.password
self.db = int(conn.extra_dejson.get('db', 0))
- self.logger.debug(
+ self.log.debug(
'''Connection "{conn}":
\thost: {host}
\tport: {port}
@@ -59,7 +59,7 @@ class RedisHook(BaseHook, LoggingMixin):
Returns a Redis connection.
"""
if not self.client:
- self.logger.debug(
+ self.log.debug(
'generating Redis client for conn_id "%s" on %s:%s:%s',
self.redis_conn_id, self.host, self.port, self.db
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index f2b5fef..0d0a104 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -29,7 +29,7 @@ import json
import pandas as pd
import time
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SalesforceHook(BaseHook, LoggingMixin):
@@ -92,10 +92,10 @@ class SalesforceHook(BaseHook, LoggingMixin):
"""
self.sign_in()
- self.logger.info("Querying for all objects")
+ self.log.info("Querying for all objects")
query = self.sf.query_all(query)
- self.logger.info(
+ self.log.info(
"Received results: Total size: %s; Done: %s",
query['totalSize'], query['done']
)
@@ -144,7 +144,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
field_string = self._build_field_list(fields)
query = "SELECT {0} FROM {1}".format(field_string, obj)
- self.logger.info(
+ self.log.info(
"Making query to Salesforce: %s",
query if len(query) < 30 else " ... ".join([query[:15], query[-15:]])
)
@@ -169,7 +169,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
try:
col = pd.to_datetime(col)
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning(
"Could not convert field to timestamps: %s", col.name
)
@@ -265,7 +265,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
# for each returned record
object_name = query_results[0]['attributes']['type']
- self.logger.info("Coercing timestamps for: %s", object_name)
+ self.log.info("Coercing timestamps for: %s", object_name)
schema = self.describe_object(object_name)
@@ -299,7 +299,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
# there are also a ton of newline objects
# that mess up our ability to write to csv
# we remove these newlines so that the output is a valid CSV format
- self.logger.info("Cleaning data and writing to CSV")
+ self.log.info("Cleaning data and writing to CSV")
possible_strings = df.columns[df.dtypes == "object"]
df[possible_strings] = df[possible_strings].apply(
lambda x: x.str.replace("\r\n", "")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index aa16130..6973023 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -16,7 +16,7 @@ import subprocess
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SparkSqlHook(BaseHook, LoggingMixin):
@@ -121,7 +121,7 @@ class SparkSqlHook(BaseHook, LoggingMixin):
connection_cmd += ["--queue", self._yarn_queue]
connection_cmd += cmd
- self.logger.debug("Spark-Sql cmd: %s", connection_cmd)
+ self.log.debug("Spark-Sql cmd: %s", connection_cmd)
return connection_cmd
@@ -151,5 +151,5 @@ class SparkSqlHook(BaseHook, LoggingMixin):
def kill(self):
if self._sp and self._sp.poll() is None:
- self.logger.info("Killing the Spark-Sql job")
+ self.log.info("Killing the Spark-Sql job")
self._sp.kill()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index bdd1efe..7d59cd2 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -18,7 +18,7 @@ import re
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -123,7 +123,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
conn_data['spark_home'] = extra.get('spark-home', None)
conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
except AirflowException:
- self.logger.debug(
+ self.log.debug(
"Could not load connection string %s, defaulting to %s",
self._conn_id, conn_data['master']
)
@@ -192,7 +192,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
if self._application_args:
connection_cmd += self._application_args
- self.logger.debug("Spark-Submit cmd: %s", connection_cmd)
+ self.log.debug("Spark-Submit cmd: %s", connection_cmd)
return connection_cmd
@@ -239,15 +239,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
self._yarn_application_id = match.groups()[0]
# Pass to logging
- self.logger.info(line)
+ self.log.info(line)
def on_kill(self):
if self._sp and self._sp.poll() is None:
- self.logger.info('Sending kill signal to %s', self._connection['spark_binary'])
+ self.log.info('Sending kill signal to %s', self._connection['spark_binary'])
self._sp.kill()
if self._yarn_application_id:
- self.logger.info('Killing application on YARN')
+ self.log.info('Killing application on YARN')
kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split()
yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.logger.info("YARN killed with return code: %s", yarn_kill.wait())
+ self.log.info("YARN killed with return code: %s", yarn_kill.wait())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 0584df4..5b00b15 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -20,7 +20,7 @@ import subprocess
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SqoopHook(BaseHook, LoggingMixin):
@@ -76,7 +76,7 @@ class SqoopHook(BaseHook, LoggingMixin):
password_index = cmd.index('--password')
cmd[password_index + 1] = 'MASKED'
except ValueError:
- self.logger.debug("No password in sqoop cmd")
+ self.log.debug("No password in sqoop cmd")
return cmd
def Popen(self, cmd, **kwargs):
@@ -87,18 +87,18 @@ class SqoopHook(BaseHook, LoggingMixin):
:param kwargs: extra arguments to Popen (see subprocess.Popen)
:return: handle to subprocess
"""
- self.logger.info("Executing command: %s", ' '.join(cmd))
+ self.log.info("Executing command: %s", ' '.join(cmd))
sp = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs)
for line in iter(sp.stdout):
- self.logger.info(line.strip())
+ self.log.info(line.strip())
sp.wait()
- self.logger.info("Command exited with return code %s", sp.returncode)
+ self.log.info("Command exited with return code %s", sp.returncode)
if sp.returncode:
raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index 3fe9146..b061fd7 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -23,7 +23,7 @@ import paramiko
from contextlib import contextmanager
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SSHHook(BaseHook, LoggingMixin):
@@ -70,7 +70,7 @@ class SSHHook(BaseHook, LoggingMixin):
def get_conn(self):
if not self.client:
- self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
+ self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
if self.ssh_conn_id is not None:
conn = self.get_connection(self.ssh_conn_id)
if self.username is None:
@@ -98,7 +98,7 @@ class SSHHook(BaseHook, LoggingMixin):
# Auto detecting username values from system
if not self.username:
- self.logger.debug(
+ self.log.debug(
"username to ssh to host: %s is not specified for connection id"
" %s. Using system's default provided by getpass.getuser()",
self.remote_host, self.ssh_conn_id
@@ -142,17 +142,17 @@ class SSHHook(BaseHook, LoggingMixin):
self.client = client
except paramiko.AuthenticationException as auth_error:
- self.logger.error(
+ self.log.error(
"Auth failed while connecting to host: %s, error: %s",
self.remote_host, auth_error
)
except paramiko.SSHException as ssh_error:
- self.logger.error(
+ self.log.error(
"Failed connecting to host: %s, error: %s",
self.remote_host, ssh_error
)
except Exception as error:
- self.logger.error(
+ self.log.error(
"Error connecting to host: %s, error: %s",
self.remote_host, error
)
@@ -191,7 +191,7 @@ class SSHHook(BaseHook, LoggingMixin):
]
ssh_cmd += ssh_tunnel_cmd
- self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd)
+ self.log.debug("Creating tunnel with cmd: %s", ssh_cmd)
proc = subprocess.Popen(ssh_cmd,
stdin=subprocess.PIPE,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 37e4a97..a2ba824 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -87,7 +87,7 @@ class BigQueryOperator(BaseOperator):
self.query_params = query_params
def execute(self, context):
- self.logger.info('Executing: %s', self.bql)
+ self.log.info('Executing: %s', self.bql)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 21de7cc..0f4ef50 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -53,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator):
self.ignore_if_missing = ignore_if_missing
def execute(self, context):
- self.logger.info('Deleting: %s', self.deletion_dataset_table)
+ self.log.info('Deleting: %s', self.deletion_dataset_table)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 8e21270..2bc4a8b 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -68,7 +68,7 @@ class BigQueryToBigQueryOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- self.logger.info(
+ self.log.info(
'Executing copy of %s into: %s',
self.source_project_dataset_tables, self.destination_project_dataset_table
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index 23a2029..800e7bd 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -79,9 +79,9 @@ class BigQueryToCloudStorageOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- self.logger.info('Executing extract of %s into: %s',
- self.source_project_dataset_table,
- self.destination_cloud_storage_uris)
+ self.log.info('Executing extract of %s into: %s',
+ self.source_project_dataset_table,
+ self.destination_cloud_storage_uris)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 8773357..cffc4ff 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -214,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
raise AirflowException(msg)
def _log_run_page_url(self, url):
- self.logger.info('View run status, Spark UI, and logs at %s', url)
+ self.log.info('View run status, Spark UI, and logs at %s', url)
def get_hook(self):
return DatabricksHook(
@@ -225,13 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
hook = self.get_hook()
self.run_id = hook.submit_run(self.json)
run_page_url = hook.get_run_page_url(self.run_id)
- self.logger.info('Run submitted with run_id: %s', self.run_id)
+ self.log.info('Run submitted with run_id: %s', self.run_id)
self._log_run_page_url(run_page_url)
while True:
run_state = hook.get_run_state(self.run_id)
if run_state.is_terminal:
if run_state.is_successful:
- self.logger.info('%s completed successfully.', self.task_id)
+ self.log.info('%s completed successfully.', self.task_id)
self._log_run_page_url(run_page_url)
return
else:
@@ -240,15 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
s=run_state)
raise AirflowException(error_message)
else:
- self.logger.info('%s in run state: %s', self.task_id, run_state)
+ self.log.info('%s in run state: %s', self.task_id, run_state)
self._log_run_page_url(run_page_url)
- self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds)
+ self.log.info('Sleeping for %s seconds.', self.polling_period_seconds)
time.sleep(self.polling_period_seconds)
def on_kill(self):
hook = self.get_hook()
hook.cancel_run(self.run_id)
- self.logger.info(
+ self.log.info(
'Task: %s with run_id: %s was requested to be cancelled.',
self.task_id, self.run_id
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 3c22b60..bdb0335 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -177,12 +177,12 @@ class DataprocClusterCreateOperator(BaseOperator):
while True:
state = self._get_cluster_state(service)
if state is None:
- self.logger.info("No state for cluster '%s'", self.cluster_name)
+ self.log.info("No state for cluster '%s'", self.cluster_name)
time.sleep(15)
else:
- self.logger.info("State for cluster '%s' is %s", self.cluster_name, state)
+ self.log.info("State for cluster '%s' is %s", self.cluster_name, state)
if self._cluster_ready(state, service):
- self.logger.info(
+ self.log.info(
"Cluster '%s' successfully created", self.cluster_name
)
return
@@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator):
return cluster_data
def execute(self, context):
- self.logger.info('Creating cluster: %s', self.cluster_name)
+ self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
@@ -272,7 +272,7 @@ class DataprocClusterCreateOperator(BaseOperator):
service = hook.get_conn()
if self._get_cluster(service):
- self.logger.info(
+ self.log.info(
'Cluster %s already exists... Checking status...',
self.cluster_name
)
@@ -290,7 +290,7 @@ class DataprocClusterCreateOperator(BaseOperator):
# probably two cluster start commands at the same time
time.sleep(10)
if self._get_cluster(service):
- self.logger.info(
+ self.log.info(
'Cluster {} already exists... Checking status...',
self.cluster_name
)
@@ -358,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
time.sleep(15)
def execute(self, context):
- self.logger.info('Deleting cluster: %s', self.cluster_name)
+ self.log.info('Deleting cluster: %s', self.cluster_name)
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
@@ -371,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
clusterName=self.cluster_name
).execute()
operation_name = response['name']
- self.logger.info("Cluster delete operation name: %s", operation_name)
+ self.log.info("Cluster delete operation name: %s", operation_name)
self._wait_for_done(service, operation_name)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 76415e1..51e1d06 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -78,7 +78,7 @@ class DatastoreExportOperator(BaseOperator):
self.xcom_push = xcom_push
def execute(self, context):
- self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket)
+ self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket)
if self.overwrite_existing and self.namespace:
gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 74bd940..d8c42e7 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -72,7 +72,7 @@ class DatastoreImportOperator(BaseOperator):
self.xcom_push = xcom_push
def execute(self, context):
- self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket)
+ self.log.info('Importing data from Cloud Storage bucket %s', self.bucket)
ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
result = ds_hook.import_from_storage_bucket(bucket=self.bucket,
file=self.file,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 0c75eaa..898a77a 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -56,11 +56,11 @@ class ECSOperator(BaseOperator):
self.hook = self.get_hook()
def execute(self, context):
- self.logger.info(
+ self.log.info(
'Running ECS Task - Task definition: %s - on cluster %s',
self.task_definition,self.cluster
)
- self.logger.info('ECSOperator overrides: %s', self.overrides)
+ self.log.info('ECSOperator overrides: %s', self.overrides)
self.client = self.hook.get_client_type(
'ecs',
@@ -77,13 +77,13 @@ class ECSOperator(BaseOperator):
failures = response['failures']
if len(failures) > 0:
raise AirflowException(response)
- self.logger.info('ECS Task started: %s', response)
+ self.log.info('ECS Task started: %s', response)
self.arn = response['tasks'][0]['taskArn']
self._wait_for_task_ended()
self._check_success_task()
- self.logger.info('ECS Task has been successfully executed: %s', response)
+ self.log.info('ECS Task has been successfully executed: %s', response)
def _wait_for_task_ended(self):
waiter = self.client.get_waiter('tasks_stopped')
@@ -98,7 +98,7 @@ class ECSOperator(BaseOperator):
cluster=self.cluster,
tasks=[self.arn]
)
- self.logger.info('ECS Task stopped, check status: %s', response)
+ self.log.info('ECS Task stopped, check status: %s', response)
if len(response.get('failures', [])) > 0:
raise AirflowException(response)
@@ -124,4 +124,4 @@ class ECSOperator(BaseOperator):
cluster=self.cluster,
task=self.arn,
reason='Task killed by the user')
- self.logger.info(response)
+ self.log.info(response)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index dbf764e..227474e 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -48,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- self.logger.info('Adding steps to %s', self.job_flow_id)
+ self.log.info('Adding steps to %s', self.job_flow_id)
response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('Adding steps failed: %s' % response)
else:
- self.logger.info('Steps %s added to JobFlow', response['StepIds'])
+ self.log.info('Steps %s added to JobFlow', response['StepIds'])
return response['StepIds']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 4e40b17..2544adf 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -50,7 +50,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
- self.logger.info(
+ self.log.info(
'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',
self.aws_conn_id, self.emr_conn_id
)
@@ -59,5 +59,5 @@ class EmrCreateJobFlowOperator(BaseOperator):
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow creation failed: %s' % response)
else:
- self.logger.info('JobFlow with id %s created', response['JobFlowId'])
+ self.log.info('JobFlow with id %s created', response['JobFlowId'])
return response['JobFlowId']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index df641ad..ec29897 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -43,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- self.logger.info('Terminating JobFlow %s', self.job_flow_id)
+ self.log.info('Terminating JobFlow %s', self.job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow termination failed: %s' % response)
else:
- self.logger.info('JobFlow with id %s terminated', self.job_flow_id)
+ self.log.info('JobFlow with id %s terminated', self.job_flow_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index 4519e1e..3478dd3 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -51,7 +51,7 @@ class FileToWasbOperator(BaseOperator):
def execute(self, context):
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
- self.logger.info(
+ self.log.info(
'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
)
hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
index ca7d716..e7640c8 100644
--- a/airflow/contrib/operators/fs_operator.py
+++ b/airflow/contrib/operators/fs_operator.py
@@ -48,7 +48,7 @@ class FileSensor(BaseSensorOperator):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = "/".join([basepath, self.filepath])
- self.logger.info('Poking for file {full_path}'.format(**locals()))
+ self.log.info('Poking for file {full_path}'.format(**locals()))
try:
files = [f for f in walk(full_path)]
except:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 27e85b7..53516b1 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -65,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
+ self.log.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
file_bytes = hook.download(self.bucket, self.object, self.filename)
@@ -74,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
else:
raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
- self.logger.info(file_bytes)
+ self.log.info(file_bytes)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 01f53cc..730a3bc 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -189,7 +189,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
self.destination_project_dataset_table))
row = cursor.fetchone()
max_id = row[0] if row[0] else 0
- self.logger.info(
+ self.log.info(
'Loaded BQ data with max %s.%s=%s',
self.destination_project_dataset_table, self.max_id_key, max_id
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index 19c6d76..d82ad61 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -66,8 +66,8 @@ class HipChatAPIOperator(BaseOperator):
'Authorization': 'Bearer %s' % self.token},
data=self.body)
if response.status_code >= 400:
- self.logger.error('HipChat API call failed: %s %s',
- response.status_code, response.reason)
+ self.log.error('HipChat API call failed: %s %s',
+ response.status_code, response.reason)
raise AirflowException('HipChat API call failed: %s %s' %
(response.status_code, response.reason))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mlengine_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index fdbfede..4d8943b 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -22,9 +22,9 @@ from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from apiclient import errors
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def _create_prediction_input(project_id,
@@ -225,7 +225,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
model_name, version_name, uri, max_worker_count,
runtime_version)
except ValueError as e:
- self.logger.error(
+ self.log.error(
'Cannot create batch prediction job request due to: %s',
e
)
@@ -251,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
raise
if finished_prediction_job['state'] != 'SUCCEEDED':
- self.logger.error(
+ self.log.error(
'Batch prediction job failed: %s',
str(finished_prediction_job))
raise RuntimeError(finished_prediction_job['errorMessage'])
@@ -538,8 +538,8 @@ class MLEngineTrainingOperator(BaseOperator):
}
if self._mode == 'DRY_RUN':
- self.logger.info('In dry_run mode.')
- self.logger.info('MLEngine Training job request is: {}'.format(training_request))
+ self.log.info('In dry_run mode.')
+ self.log.info('MLEngine Training job request is: {}'.format(training_request))
return
hook = MLEngineHook(
@@ -557,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator):
raise
if finished_training_job['state'] != 'SUCCEEDED':
- self.logger.error('MLEngine training job failed: {}'.format(
+ self.log.error('MLEngine training job failed: {}'.format(
str(finished_training_job)))
raise RuntimeError(finished_training_job['errorMessage'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index f7b3a5a..c8ebcd0 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -168,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
'mode': field_mode,
})
- self.logger.info('Using schema for %s: %s', self.schema_filename, schema)
+ self.log.info('Using schema for %s: %s', self.schema_filename, schema)
tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
json.dump(schema, tmp_schema_file_handle)
return {self.schema_filename: tmp_schema_file_handle}
[2/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to
log
Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f690fb4..28dcc04 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -77,7 +77,7 @@ from airflow.utils.operator_resources import Resources
from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.trigger_rule import TriggerRule
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
Base = declarative_base()
ID_LEN = 250
@@ -184,7 +184,7 @@ class DagBag(BaseDagBag, LoggingMixin):
if executor is None:
executor = GetDefaultExecutor()
dag_folder = dag_folder or settings.DAGS_FOLDER
- self.logger.info("Filling up the DagBag from %s", dag_folder)
+ self.log.info("Filling up the DagBag from %s", dag_folder)
self.dag_folder = dag_folder
self.dags = {}
# the file's last modified timestamp when we last read it
@@ -257,7 +257,7 @@ class DagBag(BaseDagBag, LoggingMixin):
return found_dags
except Exception as e:
- self.logger.exception(e)
+ self.log.exception(e)
return found_dags
mods = []
@@ -269,7 +269,7 @@ class DagBag(BaseDagBag, LoggingMixin):
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
- self.logger.debug("Importing %s", filepath)
+ self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
mod_name = ('unusual_prefix_' +
hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
@@ -283,7 +283,7 @@ class DagBag(BaseDagBag, LoggingMixin):
m = imp.load_source(mod_name, filepath)
mods.append(m)
except Exception as e:
- self.logger.exception("Failed to import: %s", filepath)
+ self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk
@@ -294,10 +294,10 @@ class DagBag(BaseDagBag, LoggingMixin):
mod_name, ext = os.path.splitext(mod.filename)
if not head and (ext == '.py' or ext == '.pyc'):
if mod_name == '__init__':
- self.logger.warning("Found __init__.%s at root of %s", ext, filepath)
+ self.log.warning("Found __init__.%s at root of %s", ext, filepath)
if safe_mode:
with zip_file.open(mod.filename) as zf:
- self.logger.debug("Reading %s from %s", mod.filename, filepath)
+ self.log.debug("Reading %s from %s", mod.filename, filepath)
content = zf.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
self.file_last_changed[filepath] = (
@@ -313,7 +313,7 @@ class DagBag(BaseDagBag, LoggingMixin):
m = importlib.import_module(mod_name)
mods.append(m)
except Exception as e:
- self.logger.exception("Failed to import: %s", filepath)
+ self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk
@@ -336,11 +336,11 @@ class DagBag(BaseDagBag, LoggingMixin):
Fails tasks that haven't had a heartbeat in too long
"""
from airflow.jobs import LocalTaskJob as LJ
- self.logger.info("Finding 'running' jobs without a recent heartbeat")
+ self.log.info("Finding 'running' jobs without a recent heartbeat")
TI = TaskInstance
secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
limit_dttm = datetime.now() - timedelta(seconds=secs)
- self.logger.info("Failing jobs without heartbeat after %s", limit_dttm)
+ self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
tis = (
session.query(TI)
@@ -361,7 +361,7 @@ class DagBag(BaseDagBag, LoggingMixin):
task = dag.get_task(ti.task_id)
ti.task = task
ti.handle_failure("{} killed as zombie".format(str(ti)))
- self.logger.info('Marked zombie job %s as failed', ti)
+ self.log.info('Marked zombie job %s as failed', ti)
Stats.incr('zombies_killed')
session.commit()
@@ -381,7 +381,7 @@ class DagBag(BaseDagBag, LoggingMixin):
subdag.parent_dag = dag
subdag.is_subdag = True
self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
- self.logger.debug('Loaded DAG {dag}'.format(**locals()))
+ self.log.debug('Loaded DAG {dag}'.format(**locals()))
def collect_dags(
self,
@@ -439,7 +439,7 @@ class DagBag(BaseDagBag, LoggingMixin):
str([dag.dag_id for dag in found_dags]),
))
except Exception as e:
- self.logger.warning(e)
+ self.log.warning(e)
Stats.gauge(
'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1)
Stats.gauge(
@@ -606,7 +606,7 @@ class Connection(Base, LoggingMixin):
self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_encrypted = True
except AirflowException:
- self.logger.exception("Failed to load fernet while encrypting value, "
+ self.log.exception("Failed to load fernet while encrypting value, "
"using non-encrypted value.")
self._password = value
self.is_encrypted = False
@@ -635,7 +635,7 @@ class Connection(Base, LoggingMixin):
self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_extra_encrypted = True
except AirflowException:
- self.logger.exception("Failed to load fernet while encrypting value, "
+ self.log.exception("Failed to load fernet while encrypting value, "
"using non-encrypted value.")
self._extra = value
self.is_extra_encrypted = False
@@ -706,8 +706,8 @@ class Connection(Base, LoggingMixin):
try:
obj = json.loads(self.extra)
except Exception as e:
- self.logger.exception(e)
- self.logger.error("Failed parsing the json for conn_id %s", self.conn_id)
+ self.log.exception(e)
+ self.log.error("Failed parsing the json for conn_id %s", self.conn_id)
return obj
@@ -1001,7 +1001,7 @@ class TaskInstance(Base, LoggingMixin):
"""
Forces the task instance's state to FAILED in the database.
"""
- self.logger.error("Recording the task instance as FAILED")
+ self.log.error("Recording the task instance as FAILED")
self.state = State.FAILED
session.merge(self)
session.commit()
@@ -1152,7 +1152,7 @@ class TaskInstance(Base, LoggingMixin):
session=session):
failed = True
if verbose:
- self.logger.info(
+ self.log.info(
"Dependencies not met for %s, dependency '%s' FAILED: %s",
self, dep_status.dep_name, dep_status.reason
)
@@ -1161,7 +1161,7 @@ class TaskInstance(Base, LoggingMixin):
return False
if verbose:
- self.logger.info("Dependencies all met for %s", self)
+ self.log.info("Dependencies all met for %s", self)
return True
@@ -1177,7 +1177,7 @@ class TaskInstance(Base, LoggingMixin):
session,
dep_context):
- self.logger.debug(
+ self.log.debug(
"%s dependency '%s' PASSED: %s, %s",
self, dep_status.dep_name, dep_status.passed, dep_status.reason
)
@@ -1354,10 +1354,10 @@ class TaskInstance(Base, LoggingMixin):
"runtime. Attempt {attempt} of {total}. State set to NONE.").format(
attempt=self.try_number + 1,
total=self.max_tries + 1)
- self.logger.warning(hr + msg + hr)
+ self.log.warning(hr + msg + hr)
self.queued_dttm = datetime.now()
- self.logger.info("Queuing into pool %s", self.pool)
+ self.log.info("Queuing into pool %s", self.pool)
session.merge(self)
session.commit()
return False
@@ -1366,12 +1366,12 @@ class TaskInstance(Base, LoggingMixin):
# the current worker process was blocked on refresh_from_db
if self.state == State.RUNNING:
msg = "Task Instance already running {}".format(self)
- self.logger.warning(msg)
+ self.log.warning(msg)
session.commit()
return False
# print status message
- self.logger.info(hr + msg + hr)
+ self.log.info(hr + msg + hr)
self.try_number += 1
if not test_mode:
@@ -1389,10 +1389,10 @@ class TaskInstance(Base, LoggingMixin):
if verbose:
if mark_success:
msg = "Marking success for {} on {}".format(self.task, self.execution_date)
- self.logger.info(msg)
+ self.log.info(msg)
else:
msg = "Executing {} on {}".format(self.task, self.execution_date)
- self.logger.info(msg)
+ self.log.info(msg)
return True
@provide_session
@@ -1434,7 +1434,7 @@ class TaskInstance(Base, LoggingMixin):
def signal_handler(signum, frame):
"""Setting kill signal handler"""
- self.logger.error("Killing subprocess")
+ self.log.error("Killing subprocess")
task_copy.on_kill()
raise AirflowException("Task received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
@@ -1513,8 +1513,8 @@ class TaskInstance(Base, LoggingMixin):
if task.on_success_callback:
task.on_success_callback(context)
except Exception as e3:
- self.logger.error("Failed when executing success callback")
- self.logger.exception(e3)
+ self.log.error("Failed when executing success callback")
+ self.log.exception(e3)
session.commit()
@@ -1559,7 +1559,7 @@ class TaskInstance(Base, LoggingMixin):
task_copy.dry_run()
def handle_failure(self, error, test_mode=False, context=None):
- self.logger.exception(error)
+ self.log.exception(error)
task = self.task
session = settings.Session()
self.end_date = datetime.now()
@@ -1580,20 +1580,20 @@ class TaskInstance(Base, LoggingMixin):
# next task instance try_number exceeds the max_tries.
if task.retries and self.try_number <= self.max_tries:
self.state = State.UP_FOR_RETRY
- self.logger.info('Marking task as UP_FOR_RETRY')
+ self.log.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error, is_retry=True)
else:
self.state = State.FAILED
if task.retries:
- self.logger.info('All retries failed; marking task as FAILED')
+ self.log.info('All retries failed; marking task as FAILED')
else:
- self.logger.info('Marking task as FAILED.')
+ self.log.info('Marking task as FAILED.')
if task.email_on_failure and task.email:
self.email_alert(error, is_retry=False)
except Exception as e2:
- self.logger.error('Failed to send email to: %s', task.email)
- self.logger.exception(e2)
+ self.log.error('Failed to send email to: %s', task.email)
+ self.log.exception(e2)
# Handling callbacks pessimistically
try:
@@ -1602,13 +1602,13 @@ class TaskInstance(Base, LoggingMixin):
if self.state == State.FAILED and task.on_failure_callback:
task.on_failure_callback(context)
except Exception as e3:
- self.logger.error("Failed at executing callback")
- self.logger.exception(e3)
+ self.log.error("Failed at executing callback")
+ self.log.exception(e3)
if not test_mode:
session.merge(self)
session.commit()
- self.logger.error(str(error))
+ self.log.error(str(error))
@provide_session
def get_template_context(self, session=None):
@@ -1898,7 +1898,7 @@ class Log(Base):
self.owner = owner or task_owner
-class SkipMixin(object):
+class SkipMixin(LoggingMixin):
def skip(self, dag_run, execution_date, tasks):
"""
Sets tasks instances to skipped from the same dag run.
@@ -1926,7 +1926,7 @@ class SkipMixin(object):
else:
assert execution_date is not None, "Execution date is None and no dag run"
- self.logger.warning("No DAG RUN present this should not happen")
+ self.log.warning("No DAG RUN present this should not happen")
# this is defensive against dag runs that are not complete
for task in tasks:
ti = TaskInstance(task, execution_date=execution_date)
@@ -2121,7 +2121,7 @@ class BaseOperator(LoggingMixin):
self.email_on_failure = email_on_failure
self.start_date = start_date
if start_date and not isinstance(start_date, datetime):
- self.logger.warning("start_date for %s isn't datetime.datetime", self)
+ self.log.warning("start_date for %s isn't datetime.datetime", self)
self.end_date = end_date
if not TriggerRule.is_valid(trigger_rule):
raise AirflowException(
@@ -2137,7 +2137,7 @@ class BaseOperator(LoggingMixin):
self.depends_on_past = True
if schedule_interval:
- self.logger.warning(
+ self.log.warning(
"schedule_interval is used for {}, though it has "
"been deprecated as a task parameter, you need to "
"specify it as a DAG parameter instead",
@@ -2155,7 +2155,7 @@ class BaseOperator(LoggingMixin):
if isinstance(retry_delay, timedelta):
self.retry_delay = retry_delay
else:
- self.logger.debug("Retry_delay isn't timedelta object, assuming secs")
+ self.log.debug("Retry_delay isn't timedelta object, assuming secs")
self.retry_delay = timedelta(seconds=retry_delay)
self.retry_exponential_backoff = retry_exponential_backoff
self.max_retry_delay = max_retry_delay
@@ -2455,7 +2455,7 @@ class BaseOperator(LoggingMixin):
try:
setattr(self, attr, env.loader.get_source(env, content)[0])
except Exception as e:
- self.logger.exception(e)
+ self.log.exception(e)
self.prepare_template()
@property
@@ -2574,12 +2574,12 @@ class BaseOperator(LoggingMixin):
ignore_ti_state=ignore_ti_state)
def dry_run(self):
- self.logger.info('Dry run')
+ self.log.info('Dry run')
for attr in self.template_fields:
content = getattr(self, attr)
if content and isinstance(content, six.string_types):
- self.logger.info('Rendering template for %s', attr)
- self.logger.info(content)
+ self.log.info('Rendering template for %s', attr)
+ self.log.info(content)
def get_direct_relatives(self, upstream=False):
"""
@@ -3517,7 +3517,7 @@ class DAG(BaseDag, LoggingMixin):
d['pickle_len'] = len(pickled)
d['pickling_duration'] = "{}".format(datetime.now() - dttm)
except Exception as e:
- self.logger.debug(e)
+ self.log.debug(e)
d['is_picklable'] = False
d['stacktrace'] = traceback.format_exc()
return d
@@ -3754,7 +3754,7 @@ class DAG(BaseDag, LoggingMixin):
DagModel).filter(DagModel.dag_id == self.dag_id).first()
if not orm_dag:
orm_dag = DagModel(dag_id=self.dag_id)
- self.logger.info("Creating ORM DAG for %s", self.dag_id)
+ self.log.info("Creating ORM DAG for %s", self.dag_id)
orm_dag.fileloc = self.fileloc
orm_dag.is_subdag = self.is_subdag
orm_dag.owners = owner
@@ -3797,11 +3797,11 @@ class DAG(BaseDag, LoggingMixin):
:type expiration_date: datetime
:return: None
"""
- logger = LoggingMixin().logger
+ log = LoggingMixin().log
for dag in session.query(
DagModel).filter(DagModel.last_scheduler_run < expiration_date,
DagModel.is_active).all():
- logger.info(
+ log.info(
"Deactivating DAG ID %s since it was last touched by the scheduler at %s",
dag.dag_id, dag.last_scheduler_run.isoformat()
)
@@ -3930,7 +3930,7 @@ class Variable(Base, LoggingMixin):
self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_encrypted = True
except AirflowException:
- self.logger.exception(
+ self.log.exception(
"Failed to load fernet while encrypting value, using non-encrypted value."
)
self._val = value
@@ -4052,7 +4052,7 @@ class XCom(Base, LoggingMixin):
try:
value = json.dumps(value).encode('UTF-8')
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -4123,7 +4123,7 @@ class XCom(Base, LoggingMixin):
try:
return json.loads(result.value.decode('UTF-8'))
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -4173,7 +4173,7 @@ class XCom(Base, LoggingMixin):
try:
result.value = json.loads(result.value.decode('UTF-8'))
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -4229,7 +4229,7 @@ class DagStat(Base):
session.commit()
except Exception as e:
session.rollback()
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Could not update dag stats for %s", dag_id)
log.exception(e)
@@ -4282,7 +4282,7 @@ class DagStat(Base):
session.commit()
except Exception as e:
session.rollback()
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Could not update dag stat table")
log.exception(e)
@@ -4306,7 +4306,7 @@ class DagStat(Base):
session.commit()
except Exception as e:
session.rollback()
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Could not create stat record")
log.exception(e)
@@ -4524,7 +4524,7 @@ class DagRun(Base, LoggingMixin):
tis = self.get_task_instances(session=session)
- self.logger.info("Updating state for %s considering %s task(s)", self, len(tis))
+ self.log.info("Updating state for %s considering %s task(s)", self, len(tis))
for ti in list(tis):
# skip in db?
@@ -4570,18 +4570,18 @@ class DagRun(Base, LoggingMixin):
# if all roots finished and at least on failed, the run failed
if (not unfinished_tasks and
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
- self.logger.info('Marking run %s failed', self)
+ self.log.info('Marking run %s failed', self)
self.state = State.FAILED
# if all roots succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
- self.logger.info('Marking run %s successful', self)
+ self.log.info('Marking run %s successful', self)
self.state = State.SUCCESS
# if *all tasks* are deadlocked, the run failed
elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
- self.logger.info('Deadlock; marking run %s failed', self)
+ self.log.info('Deadlock; marking run %s failed', self)
self.state = State.FAILED
# finally, if the roots aren't done, the dag is still running
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 63321fb..ff2ed51 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -67,7 +67,7 @@ class BashOperator(BaseOperator):
which will be cleaned afterwards
"""
bash_command = self.bash_command
- self.logger.info("Tmp dir root location: \n %s", gettempdir())
+ self.log.info("Tmp dir root location: \n %s", gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
@@ -75,11 +75,11 @@ class BashOperator(BaseOperator):
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
- self.logger.info(
+ self.log.info(
"Temporary script location: %s",
script_location
)
- self.logger.info("Running command: %s", bash_command)
+ self.log.info("Running command: %s", bash_command)
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
@@ -88,13 +88,13 @@ class BashOperator(BaseOperator):
self.sp = sp
- self.logger.info("Output:")
+ self.log.info("Output:")
line = ''
for line in iter(sp.stdout.readline, b''):
line = line.decode(self.output_encoding).strip()
- self.logger.info(line)
+ self.log.info(line)
sp.wait()
- self.logger.info(
+ self.log.info(
"Command exited with return code %s",
sp.returncode
)
@@ -106,6 +106,6 @@ class BashOperator(BaseOperator):
return line
def on_kill(self):
- self.logger.info('Sending SIGTERM signal to bash process group')
+ self.log.info('Sending SIGTERM signal to bash process group')
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index f263a2c..ff82539 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -71,15 +71,15 @@ class CheckOperator(BaseOperator):
self.sql = sql
def execute(self, context=None):
- self.logger.info('Executing SQL check: %s', self.sql)
+ self.log.info('Executing SQL check: %s', self.sql)
records = self.get_db_hook().get_first(self.sql)
- self.logger.info('Record: %s', records)
+ self.log.info('Record: %s', records)
if not records:
raise AirflowException("The query returned None")
elif not all([bool(r) for r in records]):
exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}"
raise AirflowException(exceptstr.format(q=self.sql, r=records))
- self.logger.info("Success.")
+ self.log.info("Success.")
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
@@ -134,7 +134,7 @@ class ValueCheckOperator(BaseOperator):
self.has_tolerance = self.tol is not None
def execute(self, context=None):
- self.logger.info('Executing SQL check: %s', self.sql)
+ self.log.info('Executing SQL check: %s', self.sql)
records = self.get_db_hook().get_first(self.sql)
if not records:
raise AirflowException("The query returned None")
@@ -208,9 +208,9 @@ class IntervalCheckOperator(BaseOperator):
def execute(self, context=None):
hook = self.get_db_hook()
- self.logger.info('Executing SQL check: %s', self.sql2)
+ self.log.info('Executing SQL check: %s', self.sql2)
row2 = hook.get_first(self.sql2)
- self.logger.info('Executing SQL check: %s', self.sql1)
+ self.log.info('Executing SQL check: %s', self.sql1)
row1 = hook.get_first(self.sql1)
if not row2:
raise AirflowException("The query {q} returned None".format(q=self.sql2))
@@ -230,20 +230,20 @@ class IntervalCheckOperator(BaseOperator):
else:
ratio = float(max(current[m], reference[m])) / \
min(current[m], reference[m])
- self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
+ self.log.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
ratios[m] = ratio
test_results[m] = ratio < self.metrics_thresholds[m]
if not all(test_results.values()):
failed_tests = [it[0] for it in test_results.items() if not it[1]]
j = len(failed_tests)
n = len(self.metrics_sorted)
- self.logger.warning(countstr.format(**locals()))
+ self.log.warning(countstr.format(**locals()))
for k in failed_tests:
- self.logger.warning(
+ self.log.warning(
fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k])
)
raise AirflowException(estr.format(", ".join(failed_tests)))
- self.logger.info("All tests have passed")
+ self.log.info("All tests have passed")
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index bd2862b..3a952cd 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -69,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator):
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
- self.logger.info("Creating DagRun %s", dr)
+ self.log.info("Creating DagRun %s", dr)
session.add(dr)
session.commit()
session.close()
else:
- self.logger.info("Criteria not met, moving on")
+ self.log.info("Criteria not met, moving on")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index 8a333d6..3011f1c 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -134,7 +134,7 @@ class DockerOperator(BaseOperator):
self.container = None
def execute(self, context):
- self.logger.info('Starting docker container from image %s', self.image)
+ self.log.info('Starting docker container from image %s', self.image)
tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
@@ -155,10 +155,10 @@ class DockerOperator(BaseOperator):
image = self.image
if self.force_pull or len(self.cli.images(name=image)) == 0:
- self.logger.info('Pulling docker image %s', image)
+ self.log.info('Pulling docker image %s', image)
for l in self.cli.pull(image, stream=True):
output = json.loads(l.decode('utf-8'))
- self.logger.info("%s", output['status'])
+ self.log.info("%s", output['status'])
cpu_shares = int(round(self.cpus * 1024))
@@ -184,7 +184,7 @@ class DockerOperator(BaseOperator):
line = line.strip()
if hasattr(line, 'decode'):
line = line.decode('utf-8')
- self.logger.info(line)
+ self.log.info(line)
exit_code = self.cli.wait(self.container['Id'])
if exit_code != 0:
@@ -202,5 +202,5 @@ class DockerOperator(BaseOperator):
def on_kill(self):
if self.cli is not None:
- self.logger.info('Stopping docker container')
+ self.log.info('Stopping docker container')
self.cli.stop(self.container['Id'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/generic_transfer.py
----------------------------------------------------------------------
diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py
index 790749a..c8a2a58 100644
--- a/airflow/operators/generic_transfer.py
+++ b/airflow/operators/generic_transfer.py
@@ -61,15 +61,15 @@ class GenericTransfer(BaseOperator):
def execute(self, context):
source_hook = BaseHook.get_hook(self.source_conn_id)
- self.logger.info("Extracting data from %s", self.source_conn_id)
- self.logger.info("Executing: \n %s", self.sql)
+ self.log.info("Extracting data from %s", self.source_conn_id)
+ self.log.info("Executing: \n %s", self.sql)
results = source_hook.get_records(self.sql)
destination_hook = BaseHook.get_hook(self.destination_conn_id)
if self.preoperator:
- self.logger.info("Running preoperator")
- self.logger.info(self.preoperator)
+ self.log.info("Running preoperator")
+ self.log.info(self.preoperator)
destination_hook.run(self.preoperator)
- self.logger.info("Inserting rows into %s", self.destination_conn_id)
+ self.log.info("Inserting rows into %s", self.destination_conn_id)
destination_hook.insert_rows(table=self.destination_table, rows=results)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 983069b..221feeb 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -93,7 +93,7 @@ class HiveOperator(BaseOperator):
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
def execute(self, context):
- self.logger.info('Executing: %s', self.hql)
+ self.log.info('Executing: %s', self.hql)
self.hook = self.get_hook()
self.hook.run_cli(hql=self.hql, schema=self.schema,
hive_conf=context_to_airflow_vars(context))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_stats_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py
index 025e427..896547e 100644
--- a/airflow/operators/hive_stats_operator.py
+++ b/airflow/operators/hive_stats_operator.py
@@ -139,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator):
""".format(**locals())
hook = PrestoHook(presto_conn_id=self.presto_conn_id)
- self.logger.info('Executing SQL check: %s', sql)
+ self.log.info('Executing SQL check: %s', sql)
row = hook.get_first(hql=sql)
- self.logger.info("Record: %s", row)
+ self.log.info("Record: %s", row)
if not row:
raise AirflowException("The query returned None")
part_json = json.dumps(self.partition, sort_keys=True)
- self.logger.info("Deleting rows from previous runs if they exist")
+ self.log.info("Deleting rows from previous runs if they exist")
mysql = MySqlHook(self.mysql_conn_id)
sql = """
SELECT 1 FROM hive_stats
@@ -167,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator):
""".format(**locals())
mysql.run(sql)
- self.logger.info("Pivoting and loading cells into the Airflow db")
+ self.log.info("Pivoting and loading cells into the Airflow db")
rows = [
(self.ds, self.dttm, self.table, part_json) +
(r[0][0], r[0][1], r[1])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index d7b1b82..e420dfd 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -92,7 +92,7 @@ class HiveToDruidTransfer(BaseOperator):
def execute(self, context):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
- self.logger.info("Extracting data from Hive")
+ self.log.info("Extracting data from Hive")
hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
sql = self.sql.strip().strip(';')
tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
@@ -107,7 +107,7 @@ class HiveToDruidTransfer(BaseOperator):
AS
{sql}
""".format(**locals())
- self.logger.info("Running command:\n %s", hql)
+ self.log.info("Running command:\n %s", hql)
hive.run_cli(hql)
m = HiveMetastoreHook(self.metastore_conn_id)
@@ -131,13 +131,13 @@ class HiveToDruidTransfer(BaseOperator):
columns=columns,
)
- self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path)
+ self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)
druid.submit_indexing_job(index_spec)
- self.logger.info("Load seems to have succeeded!")
+ self.log.info("Load seems to have succeeded!")
finally:
- self.logger.info(
+ self.log.info(
"Cleaning up by dropping the temp Hive table %s",
hive_table
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py
index e82a099..d2d9d0c 100644
--- a/airflow/operators/hive_to_mysql.py
+++ b/airflow/operators/hive_to_mysql.py
@@ -77,7 +77,7 @@ class HiveToMySqlTransfer(BaseOperator):
def execute(self, context):
hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
- self.logger.info("Extracting data from Hive: %s", self.sql)
+ self.log.info("Extracting data from Hive: %s", self.sql)
if self.bulk_load:
tmpfile = NamedTemporaryFile()
@@ -88,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator):
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
- self.logger.info("Running MySQL preoperator")
+ self.log.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)
- self.logger.info("Inserting rows into MySQL")
+ self.log.info("Inserting rows into MySQL")
if self.bulk_load:
mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name)
@@ -100,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator):
mysql.insert_rows(table=self.mysql_table, rows=results)
if self.mysql_postoperator:
- self.logger.info("Running MySQL postoperator")
+ self.log.info("Running MySQL postoperator")
mysql.run(self.mysql_postoperator)
- self.logger.info("Done.")
+ self.log.info("Done.")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_samba_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py
index d6e6dec..93ebec1 100644
--- a/airflow/operators/hive_to_samba_operator.py
+++ b/airflow/operators/hive_to_samba_operator.py
@@ -53,7 +53,7 @@ class Hive2SambaOperator(BaseOperator):
samba = SambaHook(samba_conn_id=self.samba_conn_id)
hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
tmpfile = tempfile.NamedTemporaryFile()
- self.logger.info("Fetching file from Hive")
+ self.log.info("Fetching file from Hive")
hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name)
- self.logger.info("Pushing to samba")
+ self.log.info("Pushing to samba")
samba.push_from_local(self.destination_filepath, tmpfile.name)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index d92c931..63b892c 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -74,7 +74,7 @@ class SimpleHttpOperator(BaseOperator):
def execute(self, context):
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
- self.logger.info("Calling HTTP method")
+ self.log.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py
index 942e312..4ec2fa0 100644
--- a/airflow/operators/jdbc_operator.py
+++ b/airflow/operators/jdbc_operator.py
@@ -55,6 +55,6 @@ class JdbcOperator(BaseOperator):
self.autocommit = autocommit
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 58f7e67..a1e2a0c 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -32,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
# If the DAG Run is externally triggered, then return without
# skipping downstream tasks
if context['dag_run'] and context['dag_run'].external_trigger:
- self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.")
+ self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
return
now = datetime.datetime.now()
left_window = context['dag'].following_schedule(
context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
- self.logger.info(
+ self.log.info(
'Checking latest only with left_window: %s right_window: %s now: %s',
left_window, right_window, now
)
if not left_window < now <= right_window:
- self.logger.info('Not latest execution, skipping downstream.')
+ self.log.info('Not latest execution, skipping downstream.')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
- self.logger.debug("Downstream task_ids %s", downstream_tasks)
+ self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'],
context['ti'].execution_date,
downstream_tasks)
- self.logger.info('Done.')
+ self.log.info('Done.')
else:
- self.logger.info('Latest, allowing execution to proceed.')
+ self.log.info('Latest, allowing execution to proceed.')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py
index bc0822f..3455232 100644
--- a/airflow/operators/mssql_operator.py
+++ b/airflow/operators/mssql_operator.py
@@ -44,7 +44,7 @@ class MsSqlOperator(BaseOperator):
self.database = database
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
schema=self.database)
hook.run(self.sql, autocommit=self.autocommit,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index 719ddd2..c2c858d 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -102,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
- self.logger.info("Dumping Microsoft SQL Server query results to local file")
+ self.log.info("Dumping Microsoft SQL Server query results to local file")
conn = mssql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
@@ -118,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator):
f.flush()
cursor.close()
conn.close()
- self.logger.info("Loading file into Hive")
+ self.log.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py
index 923eaf8..20f1b7e 100644
--- a/airflow/operators/mysql_operator.py
+++ b/airflow/operators/mysql_operator.py
@@ -46,7 +46,7 @@ class MySqlOperator(BaseOperator):
self.database = database
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
hook.run(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index fde92b5..cd472a8 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -110,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
- self.logger.info("Dumping MySQL query results to local file")
+ self.log.info("Dumping MySQL query results to local file")
conn = mysql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
@@ -123,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator):
f.flush()
cursor.close()
conn.close()
- self.logger.info("Loading file into Hive")
+ self.log.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/oracle_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py
index f87bbf9..9a35267 100644
--- a/airflow/operators/oracle_operator.py
+++ b/airflow/operators/oracle_operator.py
@@ -42,7 +42,7 @@ class OracleOperator(BaseOperator):
self.parameters = parameters
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
hook.run(
self.sql,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/pig_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py
index cdce48a..a4e4e5c 100644
--- a/airflow/operators/pig_operator.py
+++ b/airflow/operators/pig_operator.py
@@ -59,7 +59,7 @@ class PigOperator(BaseOperator):
"(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig)
def execute(self, context):
- self.logger.info('Executing: %s', self.pig)
+ self.log.info('Executing: %s', self.pig)
self.hook = self.get_hook()
self.hook.run_cli(pig=self.pig)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/postgres_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py
index 55c1573..c93dc7b 100644
--- a/airflow/operators/postgres_operator.py
+++ b/airflow/operators/postgres_operator.py
@@ -49,7 +49,7 @@ class PostgresOperator(BaseOperator):
self.database = database
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
schema=self.database)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/presto_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py
index 48158ca..d0c323a 100644
--- a/airflow/operators/presto_to_mysql.py
+++ b/airflow/operators/presto_to_mysql.py
@@ -61,14 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator):
def execute(self, context):
presto = PrestoHook(presto_conn_id=self.presto_conn_id)
- self.logger.info("Extracting data from Presto: %s", self.sql)
+ self.log.info("Extracting data from Presto: %s", self.sql)
results = presto.get_records(self.sql)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
- self.logger.info("Running MySQL preoperator")
- self.logger.info(self.mysql_preoperator)
+ self.log.info("Running MySQL preoperator")
+ self.log.info(self.mysql_preoperator)
mysql.run(self.mysql_preoperator)
- self.logger.info("Inserting rows into MySQL")
+ self.log.info("Inserting rows into MySQL")
mysql.insert_rows(table=self.mysql_table, rows=results)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index 56837ec..718c88f 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -87,7 +87,7 @@ class PythonOperator(BaseOperator):
self.op_kwargs = context
return_value = self.execute_callable()
- self.logger.info("Done. Returned value was: %s", return_value)
+ self.log.info("Done. Returned value was: %s", return_value)
return return_value
def execute_callable(self):
@@ -115,17 +115,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
"""
def execute(self, context):
branch = super(BranchPythonOperator, self).execute(context)
- self.logger.info("Following branch %s", branch)
- self.logger.info("Marking other directly downstream tasks as skipped")
+ self.log.info("Following branch %s", branch)
+ self.log.info("Marking other directly downstream tasks as skipped")
downstream_tasks = context['task'].downstream_list
- self.logger.debug("Downstream task_ids %s", downstream_tasks)
+ self.log.debug("Downstream task_ids %s", downstream_tasks)
skip_tasks = [t for t in downstream_tasks if t.task_id != branch]
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks)
- self.logger.info("Done.")
+ self.log.info("Done.")
class ShortCircuitOperator(PythonOperator, SkipMixin):
@@ -142,21 +142,21 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
"""
def execute(self, context):
condition = super(ShortCircuitOperator, self).execute(context)
- self.logger.info("Condition result is %s", condition)
+ self.log.info("Condition result is %s", condition)
if condition:
- self.logger.info('Proceeding with downstream tasks...')
+ self.log.info('Proceeding with downstream tasks...')
return
- self.logger.info('Skipping downstream tasks...')
+ self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
- self.logger.debug("Downstream task_ids %s", downstream_tasks)
+ self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
- self.logger.info("Done.")
+ self.log.info("Done.")
class PythonVirtualenvOperator(PythonOperator):
"""
@@ -233,7 +233,7 @@ class PythonVirtualenvOperator(PythonOperator):
# generate filenames
input_filename = os.path.join(tmp_dir, 'script.in')
output_filename = os.path.join(tmp_dir, 'script.out')
- string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+ string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
script_filename = os.path.join(tmp_dir, 'script.py')
# set up virtualenv
@@ -261,12 +261,12 @@ class PythonVirtualenvOperator(PythonOperator):
def _execute_in_subprocess(self, cmd):
try:
- self.logger.info("Executing cmd\n{}".format(cmd))
+ self.log.info("Executing cmd\n{}".format(cmd))
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if output:
- self.logger.info("Got output\n{}".format(output))
+ self.log.info("Got output\n{}".format(output))
except subprocess.CalledProcessError as e:
- self.logger.info("Got error output\n{}".format(e.output))
+ self.log.info("Got error output\n{}".format(e.output))
raise
def _write_string_args(self, filename):
@@ -294,14 +294,14 @@ class PythonVirtualenvOperator(PythonOperator):
else:
return pickle.load(f)
except ValueError:
- self.logger.error("Error deserializing result. Note that result deserialization "
+ self.log.error("Error deserializing result. Note that result deserialization "
"is not supported across major Python versions.")
raise
def _write_script(self, script_filename):
with open(script_filename, 'w') as f:
python_code = self._generate_python_code()
- self.logger.debug('Writing code to file\n{}'.format(python_code))
+ self.log.debug('Writing code to file\n{}'.format(python_code))
f.write(python_code)
def _generate_virtualenv_cmd(self, tmp_dir):
@@ -323,7 +323,7 @@ class PythonVirtualenvOperator(PythonOperator):
def _generate_python_cmd(self, tmp_dir, script_filename, input_filename, output_filename, string_args_filename):
# direct path alleviates need to activate
return ['{}/bin/python'.format(tmp_dir), script_filename, input_filename, output_filename, string_args_filename]
-
+
def _generate_python_code(self):
if self.use_dill:
pickling_library = 'dill'
@@ -354,3 +354,5 @@ class PythonVirtualenvOperator(PythonOperator):
python_callable_name=fn.__name__,
pickling_library=pickling_library)
+ self.log.info("Done.")
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py
index e25d613..683ff9c 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -70,7 +70,7 @@ class RedshiftToS3Transfer(BaseOperator):
a_key, s_key = self.s3.get_credentials()
unload_options = '\n\t\t\t'.join(self.unload_options)
- self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table)
+ self.log.info("Retrieving headers from %s.%s...", self.schema, self.table)
columns_query = """SELECT column_name
FROM information_schema.columns
@@ -99,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator):
""".format(column_names, column_castings, self.schema, self.table,
self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
- self.logger.info('Executing UNLOAD command...')
+ self.log.info('Executing UNLOAD command...')
self.hook.run(unload_query, self.autocommit)
- self.logger.info("UNLOAD command complete...")
+ self.log.info("UNLOAD command complete...")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 5de5127..68c733c 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -74,12 +74,12 @@ class S3FileTransformOperator(BaseOperator):
def execute(self, context):
source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id)
dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id)
- self.logger.info("Downloading source S3 file %s", self.source_s3_key)
+ self.log.info("Downloading source S3 file %s", self.source_s3_key)
if not source_s3.check_for_key(self.source_s3_key):
raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
source_s3_key_object = source_s3.get_key(self.source_s3_key)
with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest:
- self.logger.info(
+ self.log.info(
"Dumping S3 file %s contents to local file %s",
self.source_s3_key, f_source.name
)
@@ -90,20 +90,20 @@ class S3FileTransformOperator(BaseOperator):
[self.transform_script, f_source.name, f_dest.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
- self.logger.info("Transform script stdout %s", transform_script_stdoutdata)
+ self.log.info("Transform script stdout %s", transform_script_stdoutdata)
if transform_script_process.returncode > 0:
raise AirflowException("Transform script failed %s", transform_script_stderrdata)
else:
- self.logger.info(
+ self.log.info(
"Transform script successful. Output temporarily located at %s",
f_dest.name
)
- self.logger.info("Uploading transformed file to S3")
+ self.log.info("Uploading transformed file to S3")
f_dest.flush()
dest_s3.load_file(
filename=f_dest.name,
key=self.dest_s3_key,
replace=self.replace
)
- self.logger.info("Upload successful")
+ self.log.info("Upload successful")
dest_s3.connection.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 68fe903..2b4aceb 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator):
# Downloading file from S3
self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
- self.logger.info("Downloading S3 file")
+ self.log.info("Downloading S3 file")
if self.wildcard_match:
if not self.s3.check_for_wildcard_key(self.s3_key):
@@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator):
NamedTemporaryFile(mode="w",
dir=tmp_dir,
suffix=file_ext) as f:
- self.logger.info("Dumping S3 key {0} contents to local file {1}"
- .format(s3_key_object.key, f.name))
+ self.log.info("Dumping S3 key {0} contents to local file {1}"
+ .format(s3_key_object.key, f.name))
s3_key_object.get_contents_to_file(f)
f.flush()
self.s3.connection.close()
if not self.headers:
- self.logger.info("Loading file %s into Hive", f.name)
+ self.log.info("Loading file %s into Hive", f.name)
self.hive.load_file(
f.name,
self.hive_table,
@@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator):
else:
# Decompressing file
if self.input_compressed:
- self.logger.info("Uncompressing file %s", f.name)
+ self.log.info("Uncompressing file %s", f.name)
fn_uncompressed = uncompress_file(f.name,
file_ext,
tmp_dir)
- self.logger.info("Uncompressed to %s", fn_uncompressed)
+ self.log.info("Uncompressed to %s", fn_uncompressed)
# uncompressed file available now so deleting
# compressed file to save disk space
f.close()
@@ -178,19 +178,19 @@ class S3ToHiveTransfer(BaseOperator):
# Testing if header matches field_dict
if self.check_headers:
- self.logger.info("Matching file header against field_dict")
+ self.log.info("Matching file header against field_dict")
header_list = self._get_top_row_as_list(fn_uncompressed)
if not self._match_headers(header_list):
raise AirflowException("Header check failed")
# Deleting top header row
- self.logger.info("Removing header from file %s", fn_uncompressed)
+ self.log.info("Removing header from file %s", fn_uncompressed)
headless_file = (
self._delete_top_row_and_compress(fn_uncompressed,
file_ext,
tmp_dir))
- self.logger.info("Headless file %s", headless_file)
- self.logger.info("Loading file %s into Hive", headless_file)
+ self.log.info("Headless file %s", headless_file)
+ self.log.info("Loading file %s into Hive", headless_file)
self.hive.load_file(headless_file,
self.hive_table,
field_dict=self.field_dict,
@@ -211,7 +211,7 @@ class S3ToHiveTransfer(BaseOperator):
raise AirflowException("Unable to retrieve header row from file")
field_names = self.field_dict.keys()
if len(field_names) != len(header_list):
- self.logger.warning("Headers count mismatch"
+ self.log.warning("Headers count mismatch"
"File headers:\n {header_list}\n"
"Field names: \n {field_names}\n"
"".format(**locals()))
@@ -219,7 +219,7 @@ class S3ToHiveTransfer(BaseOperator):
test_field_match = [h1.lower() == h2.lower()
for h1, h2 in zip(header_list, field_names)]
if not all(test_field_match):
- self.logger.warning("Headers do not match field names"
+ self.log.warning("Headers do not match field names"
"File headers:\n {header_list}\n"
"Field names: \n {field_names}\n"
"".format(**locals()))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index ea301dc..b5c43c2 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -15,7 +15,7 @@
from __future__ import print_function
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
from builtins import str
@@ -82,7 +82,7 @@ class BaseSensorOperator(BaseOperator):
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
- self.logger.info("Success criteria met. Exiting.")
+ self.log.info("Success criteria met. Exiting.")
class SqlSensor(BaseSensorOperator):
@@ -108,7 +108,7 @@ class SqlSensor(BaseSensorOperator):
def poke(self, context):
hook = BaseHook.get_connection(self.conn_id).get_hook()
- self.logger.info('Poking: %s', self.sql)
+ self.log.info('Poking: %s', self.sql)
records = hook.get_records(self.sql)
if not records:
return False
@@ -237,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator):
serialized_dttm_filter = ','.join(
[datetime.isoformat() for datetime in dttm_filter])
- self.logger.info(
+ self.log.info(
'Poking for '
'{self.external_dag_id}.'
'{self.external_task_id} on '
@@ -313,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
schema, table, partition = self.parse_partition_name(partition)
- self.logger.info(
+ self.log.info(
'Poking for {schema}.{table}/{partition}'.format(**locals())
)
return self.hook.check_for_named_partition(
@@ -371,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator):
def poke(self, context):
if '.' in self.table:
self.schema, self.table = self.table.split('.')
- self.logger.info(
+ self.log.info(
'Poking for table {self.schema}.{self.table}, '
'partition {self.partition}'.format(**locals()))
if not hasattr(self, 'hook'):
@@ -417,7 +417,7 @@ class HdfsSensor(BaseSensorOperator):
:return: (bool) depending on the matching criteria
"""
if size:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
size *= settings.MEGABYTE
result = [x for x in result if x['length'] >= size]
@@ -435,7 +435,7 @@ class HdfsSensor(BaseSensorOperator):
:return: (list) of dicts which were not removed
"""
if ignore_copying:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
ignored_extentions_regex = re.compile(regex_builder)
log.debug(
@@ -448,20 +448,20 @@ class HdfsSensor(BaseSensorOperator):
def poke(self, context):
sb = self.hook(self.hdfs_conn_id).get_conn()
- self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+ self.log.info('Poking for file {self.filepath}'.format(**locals()))
try:
# IMOO it's not right here, as there no raise of any kind.
# if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
# it's not correct as the directory exists and sb does not raise any error
# here is a quick fix
result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
- self.logger.debug('HdfsSensor.poke: result is %s', result)
+ self.log.debug('HdfsSensor.poke: result is %s', result)
result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
result = self.filter_for_filesize(result, self.file_size)
return bool(result)
except:
e = sys.exc_info()
- self.logger.debug("Caught an exception !: %s", str(e))
+ self.log.debug("Caught an exception !: %s", str(e))
return False
@@ -484,7 +484,7 @@ class WebHdfsSensor(BaseSensorOperator):
def poke(self, context):
from airflow.hooks.webhdfs_hook import WebHDFSHook
c = WebHDFSHook(self.webhdfs_conn_id)
- self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+ self.log.info('Poking for file {self.filepath}'.format(**locals()))
return c.check_for_path(hdfs_path=self.filepath)
@@ -535,7 +535,7 @@ class S3KeySensor(BaseSensorOperator):
from airflow.hooks.S3_hook import S3Hook
hook = S3Hook(s3_conn_id=self.s3_conn_id)
full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
- self.logger.info('Poking for key : {full_url}'.format(**locals()))
+ self.log.info('Poking for key : {full_url}'.format(**locals()))
if self.wildcard_match:
return hook.check_for_wildcard_key(self.bucket_key,
self.bucket_name)
@@ -577,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator):
self.s3_conn_id = s3_conn_id
def poke(self, context):
- self.logger.info('Poking for prefix : {self.prefix}\n'
+ self.log.info('Poking for prefix : {self.prefix}\n'
'in bucket s3://{self.bucket_name}'.format(**locals()))
from airflow.hooks.S3_hook import S3Hook
hook = S3Hook(s3_conn_id=self.s3_conn_id)
@@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
self.target_time = target_time
def poke(self, context):
- self.logger.info('Checking if the time (%s) has come', self.target_time)
+ self.log.info('Checking if the time (%s) has come', self.target_time)
return datetime.now().time() > self.target_time
@@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
dag = context['dag']
target_dttm = dag.following_schedule(context['execution_date'])
target_dttm += self.delta
- self.logger.info('Checking if the time (%s) has come', target_dttm)
+ self.log.info('Checking if the time (%s) has come', target_dttm)
return datetime.now() > target_dttm
@@ -679,7 +679,7 @@ class HttpSensor(BaseSensorOperator):
http_conn_id=http_conn_id)
def poke(self, context):
- self.logger.info('Poking: %s', self.endpoint)
+ self.log.info('Poking: %s', self.endpoint)
try:
response = self.hook.run(self.endpoint,
data=self.request_params,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 4f2d7bc..8b21211 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -67,7 +67,7 @@ class SlackAPIOperator(BaseOperator):
rc = sc.api_call(self.method, **self.api_params)
if not rc['ok']:
msg = "Slack API call failed (%s)".format(rc['error'])
- self.logger.error(msg)
+ self.log.error(msg)
raise AirflowException(msg)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 7c85847..b32837d 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -41,6 +41,6 @@ class SqliteOperator(BaseOperator):
self.parameters = parameters or []
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id)
hook.run(self.sql, parameters=self.parameters)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7c1d246..d5c3407 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -25,9 +25,9 @@ import re
import sys
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
class AirflowPluginException(Exception):
pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/security/kerberos.py
----------------------------------------------------------------------
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index a9687b3..7a169b2 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -24,7 +24,7 @@ from airflow import configuration, LoggingMixin
NEED_KRB181_WORKAROUND = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
def renew_from_kt():
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index cf1eca4..1e5e614 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -27,9 +27,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
from airflow import configuration as conf
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
class DummyStatsLogger(object):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index 7794f4a..6a07db2 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -19,7 +19,7 @@ import json
import subprocess
import threading
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow import configuration as conf
from tempfile import mkstemp
@@ -39,7 +39,7 @@ class BaseTaskRunner(LoggingMixin):
"""
# Pass task instance context into log handlers to setup the logger.
self._task_instance = local_task_job.task_instance
- self.set_logger_contexts(self._task_instance)
+ self.set_log_contexts(self._task_instance)
popen_prepend = []
cfg_path = None
@@ -54,7 +54,7 @@ class BaseTaskRunner(LoggingMixin):
# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
if self.run_as_user and (self.run_as_user != getpass.getuser()):
- self.logger.debug("Planning to run as the %s user", self.run_as_user)
+ self.log.debug("Planning to run as the %s user", self.run_as_user)
cfg_dict = conf.as_dict(display_sensitive=True)
cfg_subset = {
'core': cfg_dict.get('core', {}),
@@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin):
line = line.decode('utf-8')
if len(line) == 0:
break
- self.logger.info('Subtask: %s', line.rstrip('\n'))
+ self.log.info('Subtask: %s', line.rstrip('\n'))
def run_command(self, run_with, join_args=False):
"""
@@ -112,7 +112,7 @@ class BaseTaskRunner(LoggingMixin):
"""
cmd = [" ".join(self._command)] if join_args else self._command
full_cmd = run_with + cmd
- self.logger.info('Running: %s', full_cmd)
+ self.log.info('Running: %s', full_cmd)
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/bash_task_runner.py b/airflow/task_runner/bash_task_runner.py
index b73e258..109b44c 100644
--- a/airflow/task_runner/bash_task_runner.py
+++ b/airflow/task_runner/bash_task_runner.py
@@ -33,7 +33,7 @@ class BashTaskRunner(BaseTaskRunner):
def terminate(self):
if self.process and psutil.pid_exists(self.process.pid):
- kill_process_tree(self.logger, self.process.pid)
+ kill_process_tree(self.log, self.process.pid)
def on_finish(self):
super(BashTaskRunner, self).on_finish()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6497fcc..cc64f68 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -27,7 +27,7 @@ from datetime import datetime
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SimpleDag(BaseDag):
@@ -205,7 +205,7 @@ def list_py_file_paths(directory, safe_mode=True):
file_paths.append(file_path)
except Exception:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.exception("Error while examining %s", f)
return file_paths
@@ -443,7 +443,7 @@ class DagFileProcessorManager(LoggingMixin):
if file_path in new_file_paths:
filtered_processors[file_path] = processor
else:
- self.logger.warning("Stopping processor for %s", file_path)
+ self.log.warning("Stopping processor for %s", file_path)
processor.stop()
self._processors = filtered_processors
@@ -519,7 +519,7 @@ class DagFileProcessorManager(LoggingMixin):
os.symlink(log_directory, latest_log_directory_path)
elif (os.path.isdir(latest_log_directory_path) or
os.path.isfile(latest_log_directory_path)):
- self.logger.warning(
+ self.log.warning(
"%s already exists as a dir/file. Skip creating symlink.",
latest_log_directory_path
)
@@ -558,7 +558,7 @@ class DagFileProcessorManager(LoggingMixin):
for file_path, processor in self._processors.items():
if processor.done:
- self.logger.info("Processor for %s finished", file_path)
+ self.log.info("Processor for %s finished", file_path)
now = datetime.now()
finished_processors[file_path] = processor
self._last_runtime[file_path] = (now -
@@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin):
simple_dags = []
for file_path, processor in finished_processors.items():
if processor.result is None:
- self.logger.warning(
+ self.log.warning(
"Processor for %s exited with return code %s. See %s for details.",
processor.file_path, processor.exit_code, processor.log_file
)
@@ -606,12 +606,12 @@ class DagFileProcessorManager(LoggingMixin):
set(files_paths_at_run_limit))
for file_path, processor in self._processors.items():
- self.logger.debug(
+ self.log.debug(
"File path %s is still being processed (started: %s)",
processor.file_path, processor.start_time.isoformat()
)
- self.logger.debug(
+ self.log.debug(
"Queuing the following files for processing:\n\t%s",
"\n\t".join(files_paths_to_queue)
)
@@ -626,7 +626,7 @@ class DagFileProcessorManager(LoggingMixin):
processor = self._processor_factory(file_path, log_file_path)
processor.start()
- self.logger.info(
+ self.log.info(
"Started a process (PID: %s) to generate tasks for %s - logging into %s",
processor.pid, file_path, log_file_path
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index c7e58e7..ef2560f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -25,9 +25,9 @@ from sqlalchemy import event, exc
from sqlalchemy.pool import Pool
from airflow import settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def provide_session(func):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/email.py
----------------------------------------------------------------------
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index f252d55..fadd4d5 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -31,7 +31,7 @@ from email.utils import formatdate
from airflow import configuration
from airflow.exceptions import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed'):
@@ -88,7 +88,7 @@ def send_email_smtp(to, subject, html_content, files=None, dryrun=False, cc=None
def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
SMTP_HOST = configuration.get('smtp', 'SMTP_HOST')
SMTP_PORT = configuration.getint('smtp', 'SMTP_PORT')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/LoggingMixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/LoggingMixin.py b/airflow/utils/log/LoggingMixin.py
deleted file mode 100644
index 4572d63..0000000
--- a/airflow/utils/log/LoggingMixin.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import logging
-from builtins import object
-
-
-class LoggingMixin(object):
- """
- Convenience super-class to have a logger configured with the class name
- """
-
- @property
- def logger(self):
- try:
- return self._logger
- except AttributeError:
- self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__)
- return self._logger
-
- def set_logger_contexts(self, task_instance):
- """
- Set the context for all handlers of current logger.
- """
- for handler in self.logger.handlers:
- try:
- handler.set_context(task_instance)
- except AttributeError:
- pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 0bc0b5e..dcdaf6d 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -15,7 +15,7 @@ import os
from airflow import configuration
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -40,7 +40,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
google_cloud_storage_conn_id=remote_conn_id
)
except:
- self.logger.error(
+ self.log.error(
'Could not create a GoogleCloudStorageHook with connection id '
'"%s". Please make sure that airflow[gcp_api] is installed '
'and the GCS connection exists.', remote_conn_id
@@ -137,7 +137,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
# return error if needed
if return_error:
msg = 'Could not read logs from {}'.format(remote_log_location)
- self.logger.error(msg)
+ self.log.error(msg)
return msg
def gcs_write(self, log, remote_log_location, append=True):
@@ -167,7 +167,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
tmpfile.flush()
self.hook.upload(bkt, blob, tmpfile.name)
except:
- self.logger.error('Could not write logs to %s', remote_log_location)
+ self.log.error('Could not write logs to %s', remote_log_location)
def parse_gcs_url(self, gsurl):
"""