You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/07/21 01:07:32 UTC
incubator-airflow git commit: Revert "[AIRFLOW-1385] Create
abstraction for Airflow task logging"
Repository: incubator-airflow
Updated Branches:
refs/heads/master e6ef06c53 -> b9576d57b
Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"
This reverts commit e6ef06c53fd4449db6e665cce5cad9418dde232f which
was committed accidentally.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9576d57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9576d57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9576d57
Branch: refs/heads/master
Commit: b9576d57b6063908e488654f0b21b338c10069fd
Parents: e6ef06c
Author: Dan Davydov <da...@airbnb.com>
Authored: Thu Jul 20 18:07:17 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Thu Jul 20 18:07:17 2017 -0700
----------------------------------------------------------------------
airflow/bin/cli.py | 111 ++++++++++---
airflow/config_templates/__init__.py | 13 --
airflow/config_templates/default_airflow.cfg | 6 -
.../config_templates/default_airflow_logging.py | 73 ---------
airflow/settings.py | 13 +-
airflow/task_runner/base_task_runner.py | 1 -
airflow/utils/log/__init__.py | 13 --
airflow/utils/log/file_task_handler.py | 158 -------------------
airflow/utils/log/s3_task_handler.py | 90 -----------
airflow/utils/logging.py | 11 --
airflow/www/views.py | 104 +++++++++---
11 files changed, 177 insertions(+), 416 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 11f415a..f568d5d 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,6 +22,7 @@ import os
import socket
import subprocess
import textwrap
+import warnings
from importlib import import_module
import argparse
@@ -51,6 +52,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
Connection)
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
+from airflow.utils import logging as logging_utils
+from airflow.utils.file import mkdirs
from airflow.www.app import cached_app
from sqlalchemy import func
@@ -324,6 +327,55 @@ def run(args, dag=None):
settings.configure_vars()
settings.configure_orm()
+ logging.root.handlers = []
+ if args.raw:
+ # Output to STDOUT for the parent process to read and log
+ logging.basicConfig(
+ stream=sys.stdout,
+ level=settings.LOGGING_LEVEL,
+ format=settings.LOG_FORMAT)
+ else:
+ # Setting up logging to a file.
+
+ # To handle log writing when tasks are impersonated, the log files need to
+ # be writable by the user that runs the Airflow command and the user
+ # that is impersonated. This is mainly to handle corner cases with the
+ # SubDagOperator. When the SubDagOperator is run, all of the operators
+ # run under the impersonated user and create appropriate log files
+ # as the impersonated user. However, if the user manually runs tasks
+ # of the SubDagOperator through the UI, then the log files are created
+ # by the user that runs the Airflow command. For example, the Airflow
+ # run command may be run by the `airflow_sudoable` user, but the Airflow
+ # tasks may be run by the `airflow` user. If the log files are not
+ # writable by both users, then it's possible that re-running a task
+ # via the UI (or vice versa) results in a permission error as the task
+ # tries to write to a log file created by the other user.
+ log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
+ directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
+ # Create the log file and give it group writable permissions
+ # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
+ # operator is not compatible with impersonation (e.g. if a Celery executor is used
+ # for a SubDag operator and the SubDag operator has a different owner than the
+ # parent DAG)
+ if not os.path.exists(directory):
+ # Create the directory as globally writable using custom mkdirs
+ # as os.makedirs doesn't set mode properly.
+ mkdirs(directory, 0o775)
+ iso = args.execution_date.isoformat()
+ filename = "{directory}/{iso}".format(**locals())
+
+ if not os.path.exists(filename):
+ open(filename, "a").close()
+ os.chmod(filename, 0o666)
+
+ logging.basicConfig(
+ filename=filename,
+ level=settings.LOGGING_LEVEL,
+ format=settings.LOG_FORMAT)
+
+ hostname = socket.getfqdn()
+ logging.info("Running on host {}".format(hostname))
+
if not args.pickle and not dag:
dag = get_dag(args)
elif not dag:
@@ -339,21 +391,8 @@ def run(args, dag=None):
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()
- logger = logging.getLogger('airflow.task')
- if args.raw:
- logger = logging.getLogger('airflow.task.raw')
-
- for handler in logger.handlers:
- try:
- print("inside cli, setting up context")
- handler.set_context(ti)
- except AttributeError:
- pass
-
- hostname = socket.getfqdn()
- logger.info("Running on host {}".format(hostname))
-
if args.local:
+ print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
@@ -411,13 +450,43 @@ def run(args, dag=None):
if args.raw:
return
- # Force the log to flush. The flush is important because we
- # subsequently read from the log to insert into S3 or Google
- # cloud storage. Explicitly close the handler is needed in order
- # to upload to remote storage services.
- for handler in logger.handlers:
- handler.flush()
- handler.close()
+ # Force the log to flush, and set the handler to go back to normal so we
+ # don't continue logging to the task's log file. The flush is important
+ # because we subsequently read from the log to insert into S3 or Google
+ # cloud storage.
+ logging.root.handlers[0].flush()
+ logging.root.handlers = []
+
+ # store logs remotely
+ remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+
+ # deprecated as of March 2016
+ if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
+ warnings.warn(
+ 'The S3_LOG_FOLDER conf key has been replaced by '
+ 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
+ 'update airflow.cfg to ensure future compatibility.',
+ DeprecationWarning)
+ remote_base = conf.get('core', 'S3_LOG_FOLDER')
+
+ if os.path.exists(filename):
+ # read log and remove old logs to get just the latest additions
+
+ with open(filename, 'r') as logfile:
+ log = logfile.read()
+
+ remote_log_location = filename.replace(log_base, remote_base)
+ # S3
+ if remote_base.startswith('s3:/'):
+ logging_utils.S3Log().write(log, remote_log_location)
+ # GCS
+ elif remote_base.startswith('gs:/'):
+ logging_utils.GCSLog().write(log, remote_log_location)
+ # Other
+ elif remote_base and remote_base != 'None':
+ logging.error(
+ 'Unsupported remote log location: {}'.format(remote_base))
+
def task_failed_deps(args):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/__init__.py b/airflow/config_templates/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/airflow/config_templates/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 77592d9..ddd1ba8 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -122,12 +122,6 @@ security =
# values at runtime)
unit_test_mode = False
-# Logging configuration path
-logging_config_path = airflow.logging.airflow_logging_config.AIRFLOW_LOGGING_CONFIG
-
-# Name of handler to read task instance logs
-task_log_reader = airflow.task
-
[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py
deleted file mode 100644
index 241571f..0000000
--- a/airflow/config_templates/default_airflow_logging.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from airflow import configuration as conf
-
-# TODO: Logging format and level should be configured
-# in this file instead of from airflow.cfg. Currently
-# there are other log format and level configurations in
-# settings.py and cli.py.
-
-LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
-LOG_FORMAT = conf.get('core', 'log_format')
-
-
-BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
-# TODO: This should be specified as s3_remote and/or gcs_remote
-REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-
-DEFAULT_LOGGING_CONFIG = {
- 'version': 1,
- 'disable_existing_loggers': False,
- 'formatters': {
- 'airflow.task': {
- 'format': LOG_FORMAT,
- }
- },
- 'handlers': {
- 'console': {
- 'class': 'logging.StreamHandler',
- 'formatter': 'airflow.task',
- 'stream': 'ext://sys.stdout'
- },
- 'file.task': {
- 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
- 'formatter': 'airflow.task',
- 'base_log_folder': BASE_LOG_FOLDER,
- },
- 's3.task': {
- 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
- 'base_log_folder': BASE_LOG_FOLDER,
- 'remote_base_log_folder': REMOTE_BASE_LOG_FOLDER,
- 'formatter': 'airflow.task',
- },
- },
- 'loggers': {
- 'airflow.task': {
- 'handlers': ['file.task'],
- 'level': LOG_LEVEL,
- 'propagate': False,
- },
- 'airflow.task_runner': {
- 'handlers': ['file.task'],
- 'level': LOG_LEVEL,
- 'propagate': True,
- },
- 'airflow.task.raw': {
- 'handlers': ['console'],
- 'level': LOG_LEVEL,
- 'propagate': False,
- },
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 7a61bce..3f7560d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -18,7 +18,6 @@ from __future__ import print_function
from __future__ import unicode_literals
import logging
-import logging.config
import os
import sys
@@ -163,23 +162,13 @@ def configure_orm(disable_connection_pool=False):
try:
from airflow_local_settings import *
logging.info("Loaded airflow_local_settings.")
-except Exception:
+except:
pass
configure_logging()
configure_vars()
configure_orm()
-# TODO: Merge airflow logging configurations.
-logging_config_path = conf.get('core', 'logging_config_path')
-try:
- from logging_config_path import LOGGING_CONFIG
-except Exception:
- # Import default logging configuration
- from airflow.config_templates.default_airflow_logging import \
- DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG
-logging.config.dictConfig(LOGGING_CONFIG)
-
# Const stuff
KILOBYTE = 1024
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index 02404d1..fa3fd6b 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -36,7 +36,6 @@ class BaseTaskRunner(LoggingMixin):
:type local_task_job: airflow.jobs.LocalTaskJob
"""
self._task_instance = local_task_job.task_instance
- self.set_logger_contexts(self._task_instance)
popen_prepend = []
cfg_path = None
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/airflow/utils/log/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
deleted file mode 100644
index 94588dd..0000000
--- a/airflow/utils/log/file_task_handler.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-
-from airflow import configuration as conf
-from airflow.configuration import AirflowConfigException
-from airflow.utils.file import mkdirs
-
-
-class FileTaskHandler(logging.Handler):
- """
- FileTaskHandler is a python log handler that handles and reads
- task instance logs. It creates and delegates log handling
- to `logging.FileHandler` after receiving task instance context.
- It reads logs from task instance's host machine.
- """
-
- def __init__(self, base_log_folder):
- super(FileTaskHandler, self).__init__()
- self.handler = None
- self.local_base = os.path.expanduser(base_log_folder)
-
- def set_context(self, task_instance):
- """
- Provide task_instance context to airflow task handler.
- :param task_instance: task instance object
- """
- self._init_file(task_instance)
- local_loc = self.get_local_loc(task_instance)
- self.handler = logging.FileHandler(local_loc)
- self.handler.setFormatter(self.formatter)
- self.handler.setLevel(self.level)
-
- def emit(self, record):
- if self.handler:
- self.handler.emit(record)
-
- def flush(self):
- if self.handler:
- self.handler.flush()
-
- def close(self):
- if self.handler:
- self.handler.close()
-
- def read(self, task_instance):
- """
- Read log of given task instance from task instance host server.
- :param task_instance: task instance database record
- """
- log = ""
- # Task instance here might be different from task instance when
- # initializing the handler. Thus explicitly getting log location
- # is needed to get correct log path.
- loc = self.get_local_loc(task_instance)
-
- if os.path.exists(loc):
- try:
- with open(loc) as f:
- log += "*** Reading local log.\n" + "".join(f.readlines())
- except Exception:
- log = "*** Failed to load local log file: {0}.\n".format(loc)
- else:
- url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log",
- self.get_log_relative_path(task_instance)).format(
- ti=task_instance,
- worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
- log += "*** Log file isn't local.\n"
- log += "*** Fetching here: {url}\n".format(**locals())
- try:
- import requests
- timeout = None # No timeout
- try:
- timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
- except (AirflowConfigException, ValueError):
- pass
-
- response = requests.get(url, timeout=timeout)
- response.raise_for_status()
- log += '\n' + response.text
- except Exception:
- log += "*** Failed to fetch log file from worker.\n".format(
- **locals())
- return log
-
- def get_log_relative_dir(self, ti):
- """
- Get relative log file directory.
- :param ti: task instance object
- """
- return "{}/{}".format(ti.dag_id, ti.task_id)
-
- def get_log_relative_path(self, ti):
- """
- Get relative log file path.
- :param ti: task instance object
- """
- directory = self.get_log_relative_dir(ti)
- filename = "{}.log".format(ti.execution_date.isoformat())
- return os.path.join(directory, filename)
-
- def get_local_loc(self, ti):
- """
- Get full local log path given task instance object.
- :param ti: task instance object
- """
- log_relative_path = self.get_log_relative_path(ti)
- return os.path.join(self.local_base, log_relative_path)
-
- def _init_file(self, ti):
- """
- Create log directory and give it correct permissions.
- :param ti: task instance object
- :return relative log path of the given task instance
- """
- # To handle log writing when tasks are impersonated, the log files need to
- # be writable by the user that runs the Airflow command and the user
- # that is impersonated. This is mainly to handle corner cases with the
- # SubDagOperator. When the SubDagOperator is run, all of the operators
- # run under the impersonated user and create appropriate log files
- # as the impersonated user. However, if the user manually runs tasks
- # of the SubDagOperator through the UI, then the log files are created
- # by the user that runs the Airflow command. For example, the Airflow
- # run command may be run by the `airflow_sudoable` user, but the Airflow
- # tasks may be run by the `airflow` user. If the log files are not
- # writable by both users, then it's possible that re-running a task
- # via the UI (or vice versa) results in a permission error as the task
- # tries to write to a log file created by the other user.
- relative_log_dir = self.get_log_relative_dir(ti)
- directory = os.path.join(self.local_base, relative_log_dir)
- # Create the log file and give it group writable permissions
- # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
- # operator is not compatible with impersonation (e.g. if a Celery executor is used
- # for a SubDag operator and the SubDag operator has a different owner than the
- # parent DAG)
- if not os.path.exists(directory):
- # Create the directory as globally writable using custom mkdirs
- # as os.makedirs doesn't set mode properly.
- mkdirs(directory, 0o775)
-
- local_loc = self.get_local_loc(ti)
-
- if not os.path.exists(local_loc):
- open(local_loc, "a").close()
- os.chmod(local_loc, 0o666)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
deleted file mode 100644
index 5c8ad7e..0000000
--- a/airflow/utils/log/s3_task_handler.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import warnings
-
-from airflow import configuration as conf
-from airflow.utils import logging as logging_utils
-from airflow.utils.log.file_task_handler import FileTaskHandler
-
-
-class S3TaskHandler(FileTaskHandler):
- """
- S3TaskHandler is a python log handler that handles and reads
- task instance logs. It extends airflow FileTaskHandler and
- uploads to and reads from S3 remote storage.
- """
-
- def __init__(self, base_log_folder, remote_base_log_folder):
- super(S3TaskHandler, self).__init__(base_log_folder)
- self.remote_base = remote_base_log_folder
- # deprecated as of March 2016
- if not self.remote_base and conf.get('core', 'S3_LOG_FOLDER'):
- warnings.warn(
- 'The S3_LOG_FOLDER conf key has been replaced by '
- 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
- 'update airflow.cfg to ensure future compatibility.',
- DeprecationWarning)
- self.remote_base = conf.get('core', 'S3_LOG_FOLDER')
- self.closed = False
-
- def set_context(self, task_instance):
- super(S3TaskHandler, self).set_context(task_instance)
- # Local location and remote location is needed to open and
- # upload local log file to S3 remote storage.
- self.local_loc = self.get_local_loc(task_instance)
- self.remote_loc = os.path.join(self.remote_base,
- self.get_log_relative_path(task_instance))
-
- def close(self):
- """
- Close and upload local log file to remote storage S3.
- """
- # When application exit, system shuts down all handlers by
- # calling close method. Here we check if logger is already
- # closed to prevent uploading the log to remote storage multiple
- # times when `logging.shutdown` is called.
- if self.closed:
- return
-
- super(S3TaskHandler, self).close()
-
- if os.path.exists(self.local_loc):
- # read log and remove old logs to get just the latest additions
- with open(self.local_loc, 'r') as logfile:
- log = logfile.read()
- logging_utils.S3Log().write(log, self.remote_loc)
-
- self.closed = True
-
- def read(self, task_instance):
- """
- Read logs of given task instance from S3 remote storage. If failed,
- read the log from local machine.
- :param task_instance: task instance object
- """
- log = ""
- remote_loc = os.path.join(self.remote_base, self.get_log_relative_path(task_instance))
- # TODO: check if the remote_loc exist first instead of checking
- # if remote_log here. This logic is going to modified once logs are split
- # by try_number
- remote_log = logging_utils.S3Log().read(remote_loc, return_error=True)
- if remote_log:
- log += ('*** Reading remote log from {}.\n{}\n'.format(
- remote_loc, remote_log))
- else:
- log = super(S3TaskHandler, self).read(task_instance)
-
- return log
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 2df3086..96767cb 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -35,20 +35,9 @@ class LoggingMixin(object):
try:
return self._logger
except AttributeError:
- print(self.__class__.__module__ + '.' + self.__class__.__name__)
self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__)
return self._logger
- def set_logger_contexts(self, task_instance):
- """
- Set the context for all handlers of the logger.
- """
- for handler in self.logger.handlers:
- try:
- handler.set_context(task_instance)
- except AttributeError:
- pass
-
class S3Log(object):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c429765..6c39462 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,7 +16,6 @@
from past.builtins import basestring, unicode
import ast
-import logging
import os
import pkg_resources
import socket
@@ -73,10 +72,12 @@ from airflow.utils.json import json_ser
from airflow.utils.state import State
from airflow.utils.db import provide_session
from airflow.utils.helpers import alchemy_to_dict
+from airflow.utils import logging as log_utils
from airflow.utils.dates import infer_time_unit, scale_time_units
from airflow.www import utils as wwwutils
from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
from airflow.www.validators import GreaterEqualThan
+from airflow.configuration import AirflowConfigException
QUERY_LIMIT = 100000
CHART_LIMIT = 200000
@@ -697,32 +698,99 @@ class Airflow(BaseView):
@login_required
@wwwutils.action_logging
def log(self):
+ BASE_LOG_FOLDER = os.path.expanduser(
+ conf.get('core', 'BASE_LOG_FOLDER'))
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
+ dag = dagbag.get_dag(dag_id)
+ log_relative = "{dag_id}/{task_id}/{execution_date}".format(
+ **locals())
+ loc = os.path.join(BASE_LOG_FOLDER, log_relative)
+ loc = loc.format(**locals())
+ log = ""
+ TI = models.TaskInstance
dttm = dateutil.parser.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
- dag = dagbag.get_dag(dag_id)
-
session = Session()
- ti = session.query(models.TaskInstance).filter(
- models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id,
- models.TaskInstance.execution_date == dttm).first()
+ ti = session.query(TI).filter(
+ TI.dag_id == dag_id, TI.task_id == task_id,
+ TI.execution_date == dttm).first()
+
if ti is None:
log = "*** Task instance did not exist in the DB\n"
else:
- logger = logging.getLogger('airflow.task')
- task_log_reader = conf.get('core', 'task_log_reader')
- for handler in logger.handlers:
- print("handler name is {}".format(handler.name))
- handler = next((handler for handler in logger.handlers
- if handler.name == task_log_reader), None)
- try:
- log = handler.read(ti)
- except AttributeError as e:
- log = "Task log handler {} does not support read logs.\n".format(
- task_log_reader)
- log += e.message
+ # load remote logs
+ remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+ remote_log_loaded = False
+ if remote_log_base:
+ remote_log_path = os.path.join(remote_log_base, log_relative)
+ remote_log = ""
+
+ # Only display errors reading the log if the task completed or ran at least
+ # once before (otherwise there won't be any remote log stored).
+ ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED}
+ ti_ran_more_than_once = ti.try_number > 1
+ surface_log_retrieval_errors = (
+ ti_execution_completed or ti_ran_more_than_once)
+
+ # S3
+ if remote_log_path.startswith('s3:/'):
+ remote_log += log_utils.S3Log().read(
+ remote_log_path, return_error=surface_log_retrieval_errors)
+ remote_log_loaded = True
+ # GCS
+ elif remote_log_path.startswith('gs:/'):
+ remote_log += log_utils.GCSLog().read(
+ remote_log_path, return_error=surface_log_retrieval_errors)
+ remote_log_loaded = True
+ # unsupported
+ else:
+ remote_log += '*** Unsupported remote log location.'
+
+ if remote_log:
+ log += ('*** Reading remote log from {}.\n{}\n'.format(
+ remote_log_path, remote_log))
+
+ # We only want to display the
+ # local logs while the task is running if a remote log configuration is set up
+ # since the logs will be transfered there after the run completes.
+ # TODO(aoen): One problem here is that if a task is running on a worker it
+ # already ran on, then duplicate logs will be printed for all of the previous
+ # runs of the task that already completed since they will have been printed as
+ # part of the remote log section above. This can be fixed either by streaming
+ # logs to the log servers as tasks are running, or by creating a proper
+ # abstraction for multiple task instance runs).
+ if not remote_log_loaded or ti.state == State.RUNNING:
+ if os.path.exists(loc):
+ try:
+ f = open(loc)
+ log += "*** Reading local log.\n" + "".join(f.readlines())
+ f.close()
+ except:
+ log = "*** Failed to load local log file: {0}.\n".format(loc)
+ else:
+ WORKER_LOG_SERVER_PORT = \
+ conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+ url = os.path.join(
+ "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative
+ ).format(**locals())
+ log += "*** Log file isn't local.\n"
+ log += "*** Fetching here: {url}\n".format(**locals())
+ try:
+ import requests
+ timeout = None # No timeout
+ try:
+ timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
+ except (AirflowConfigException, ValueError):
+ pass
+
+ response = requests.get(url, timeout=timeout)
+ response.raise_for_status()
+ log += '\n' + response.text
+ except:
+ log += "*** Failed to fetch log file from worker.\n".format(
+ **locals())
if PY2 and not isinstance(log, unicode):
log = log.decode('utf-8')