You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/07/21 01:07:32 UTC

incubator-airflow git commit: Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e6ef06c53 -> b9576d57b


Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"

This reverts commit e6ef06c53fd4449db6e665cce5cad9418dde232f which
was committed accidentally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9576d57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9576d57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9576d57

Branch: refs/heads/master
Commit: b9576d57b6063908e488654f0b21b338c10069fd
Parents: e6ef06c
Author: Dan Davydov <da...@airbnb.com>
Authored: Thu Jul 20 18:07:17 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Thu Jul 20 18:07:17 2017 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                              | 111 ++++++++++---
 airflow/config_templates/__init__.py            |  13 --
 airflow/config_templates/default_airflow.cfg    |   6 -
 .../config_templates/default_airflow_logging.py |  73 ---------
 airflow/settings.py                             |  13 +-
 airflow/task_runner/base_task_runner.py         |   1 -
 airflow/utils/log/__init__.py                   |  13 --
 airflow/utils/log/file_task_handler.py          | 158 -------------------
 airflow/utils/log/s3_task_handler.py            |  90 -----------
 airflow/utils/logging.py                        |  11 --
 airflow/www/views.py                            | 104 +++++++++---
 11 files changed, 177 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 11f415a..f568d5d 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,6 +22,7 @@ import os
 import socket
 import subprocess
 import textwrap
+import warnings
 from importlib import import_module
 
 import argparse
@@ -51,6 +52,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
                             Connection)
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
+from airflow.utils import logging as logging_utils
+from airflow.utils.file import mkdirs
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -324,6 +327,55 @@ def run(args, dag=None):
         settings.configure_vars()
         settings.configure_orm()
 
+    logging.root.handlers = []
+    if args.raw:
+        # Output to STDOUT for the parent process to read and log
+        logging.basicConfig(
+            stream=sys.stdout,
+            level=settings.LOGGING_LEVEL,
+            format=settings.LOG_FORMAT)
+    else:
+        # Setting up logging to a file.
+
+        # To handle log writing when tasks are impersonated, the log files need to
+        # be writable by the user that runs the Airflow command and the user
+        # that is impersonated. This is mainly to handle corner cases with the
+        # SubDagOperator. When the SubDagOperator is run, all of the operators
+        # run under the impersonated user and create appropriate log files
+        # as the impersonated user. However, if the user manually runs tasks
+        # of the SubDagOperator through the UI, then the log files are created
+        # by the user that runs the Airflow command. For example, the Airflow
+        # run command may be run by the `airflow_sudoable` user, but the Airflow
+        # tasks may be run by the `airflow` user. If the log files are not
+        # writable by both users, then it's possible that re-running a task
+        # via the UI (or vice versa) results in a permission error as the task
+        # tries to write to a log file created by the other user.
+        log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
+        directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
+        # Create the log file and give it group writable permissions
+        # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
+        # operator is not compatible with impersonation (e.g. if a Celery executor is used
+        # for a SubDag operator and the SubDag operator has a different owner than the
+        # parent DAG)
+        if not os.path.exists(directory):
+            # Create the directory as globally writable using custom mkdirs
+            # as os.makedirs doesn't set mode properly.
+            mkdirs(directory, 0o775)
+        iso = args.execution_date.isoformat()
+        filename = "{directory}/{iso}".format(**locals())
+
+        if not os.path.exists(filename):
+            open(filename, "a").close()
+            os.chmod(filename, 0o666)
+
+        logging.basicConfig(
+            filename=filename,
+            level=settings.LOGGING_LEVEL,
+            format=settings.LOG_FORMAT)
+
+    hostname = socket.getfqdn()
+    logging.info("Running on host {}".format(hostname))
+
     if not args.pickle and not dag:
         dag = get_dag(args)
     elif not dag:
@@ -339,21 +391,8 @@ def run(args, dag=None):
     ti = TaskInstance(task, args.execution_date)
     ti.refresh_from_db()
 
-    logger = logging.getLogger('airflow.task')
-    if args.raw:
-        logger = logging.getLogger('airflow.task.raw')
-
-    for handler in logger.handlers:
-        try:
-            print("inside cli, setting up context")
-            handler.set_context(ti)
-        except AttributeError:
-            pass
-
-    hostname = socket.getfqdn()
-    logger.info("Running on host {}".format(hostname))
-
     if args.local:
+        print("Logging into: " + filename)
         run_job = jobs.LocalTaskJob(
             task_instance=ti,
             mark_success=args.mark_success,
@@ -411,13 +450,43 @@ def run(args, dag=None):
     if args.raw:
         return
 
-    # Force the log to flush. The flush is important because we
-    # subsequently read from the log to insert into S3 or Google
-    # cloud storage. Explicitly close the handler is needed in order
-    # to upload to remote storage services.
-    for handler in logger.handlers:
-        handler.flush()
-        handler.close()
+    # Force the log to flush, and set the handler to go back to normal so we
+    # don't continue logging to the task's log file. The flush is important
+    # because we subsequently read from the log to insert into S3 or Google
+    # cloud storage.
+    logging.root.handlers[0].flush()
+    logging.root.handlers = []
+
+    # store logs remotely
+    remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+
+    # deprecated as of March 2016
+    if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
+        warnings.warn(
+            'The S3_LOG_FOLDER conf key has been replaced by '
+            'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
+            'update airflow.cfg to ensure future compatibility.',
+            DeprecationWarning)
+        remote_base = conf.get('core', 'S3_LOG_FOLDER')
+
+    if os.path.exists(filename):
+        # read log and remove old logs to get just the latest additions
+
+        with open(filename, 'r') as logfile:
+            log = logfile.read()
+
+        remote_log_location = filename.replace(log_base, remote_base)
+        # S3
+        if remote_base.startswith('s3:/'):
+            logging_utils.S3Log().write(log, remote_log_location)
+        # GCS
+        elif remote_base.startswith('gs:/'):
+            logging_utils.GCSLog().write(log, remote_log_location)
+        # Other
+        elif remote_base and remote_base != 'None':
+            logging.error(
+                'Unsupported remote log location: {}'.format(remote_base))
+
 
 def task_failed_deps(args):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/__init__.py b/airflow/config_templates/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/airflow/config_templates/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 77592d9..ddd1ba8 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -122,12 +122,6 @@ security =
 # values at runtime)
 unit_test_mode = False
 
-# Logging configuration path
-logging_config_path = airflow.logging.airflow_logging_config.AIRFLOW_LOGGING_CONFIG
-
-# Name of handler to read task instance logs
-task_log_reader = airflow.task
-
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py
deleted file mode 100644
index 241571f..0000000
--- a/airflow/config_templates/default_airflow_logging.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from airflow import configuration as conf
-
-# TODO: Logging format and level should be configured
-# in this file instead of from airflow.cfg. Currently
-# there are other log format and level configurations in
-# settings.py and cli.py.
-
-LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
-LOG_FORMAT = conf.get('core', 'log_format')
-
-
-BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
-# TODO: This should be specified as s3_remote and/or gcs_remote
-REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-
-DEFAULT_LOGGING_CONFIG = {
-    'version': 1,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'airflow.task': {
-            'format': LOG_FORMAT,
-        }
-    },
-    'handlers': {
-        'console': {
-            'class': 'logging.StreamHandler',
-            'formatter': 'airflow.task',
-            'stream': 'ext://sys.stdout'
-        },
-        'file.task': {
-            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': BASE_LOG_FOLDER,
-        },
-        's3.task': {
-            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
-            'base_log_folder': BASE_LOG_FOLDER,
-            'remote_base_log_folder': REMOTE_BASE_LOG_FOLDER,
-            'formatter': 'airflow.task',
-        },
-    },
-    'loggers': {
-        'airflow.task': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-        'airflow.task_runner': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': True,
-        },
-        'airflow.task.raw': {
-            'handlers': ['console'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 7a61bce..3f7560d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -18,7 +18,6 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import logging
-import logging.config
 import os
 import sys
 
@@ -163,23 +162,13 @@ def configure_orm(disable_connection_pool=False):
 try:
     from airflow_local_settings import *
     logging.info("Loaded airflow_local_settings.")
-except Exception:
+except:
     pass
 
 configure_logging()
 configure_vars()
 configure_orm()
 
-# TODO: Merge airflow logging configurations.
-logging_config_path = conf.get('core', 'logging_config_path')
-try:
-    from logging_config_path import LOGGING_CONFIG
-except Exception:
-    # Import default logging configuration
-    from airflow.config_templates.default_airflow_logging import \
-        DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG
-logging.config.dictConfig(LOGGING_CONFIG)
-
 # Const stuff
 
 KILOBYTE = 1024

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index 02404d1..fa3fd6b 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -36,7 +36,6 @@ class BaseTaskRunner(LoggingMixin):
         :type local_task_job: airflow.jobs.LocalTaskJob
         """
         self._task_instance = local_task_job.task_instance
-        self.set_logger_contexts(self._task_instance)
 
         popen_prepend = []
         cfg_path = None

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/airflow/utils/log/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
deleted file mode 100644
index 94588dd..0000000
--- a/airflow/utils/log/file_task_handler.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-
-from airflow import configuration as conf
-from airflow.configuration import AirflowConfigException
-from airflow.utils.file import mkdirs
-
-
-class FileTaskHandler(logging.Handler):
-    """
-    FileTaskHandler is a python log handler that handles and reads
-    task instance logs. It creates and delegates log handling
-    to `logging.FileHandler` after receiving task instance context.
-    It reads logs from task instance's host machine.
-    """
-
-    def __init__(self, base_log_folder):
-        super(FileTaskHandler, self).__init__()
-        self.handler = None
-        self.local_base = os.path.expanduser(base_log_folder)
-
-    def set_context(self, task_instance):
-        """
-        Provide task_instance context to airflow task handler.
-        :param task_instance: task instance object
-        """
-        self._init_file(task_instance)
-        local_loc = self.get_local_loc(task_instance)
-        self.handler = logging.FileHandler(local_loc)
-        self.handler.setFormatter(self.formatter)
-        self.handler.setLevel(self.level)
-
-    def emit(self, record):
-        if self.handler:
-            self.handler.emit(record)
-
-    def flush(self):
-        if self.handler:
-            self.handler.flush()
-
-    def close(self):
-        if self.handler:
-            self.handler.close()
-
-    def read(self, task_instance):
-        """
-        Read log of given task instance from task instance host server.
-        :param task_instance: task instance database record
-        """
-        log = ""
-        # Task instance here might be different from task instance when
-        # initializing the handler. Thus explicitly getting log location
-        # is needed to get correct log path.
-        loc = self.get_local_loc(task_instance)
-
-        if os.path.exists(loc):
-            try:
-                with open(loc) as f:
-                    log += "*** Reading local log.\n" + "".join(f.readlines())
-            except Exception:
-                log = "*** Failed to load local log file: {0}.\n".format(loc)
-        else:
-            url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log",
-                               self.get_log_relative_path(task_instance)).format(
-                ti=task_instance,
-                worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
-            log += "*** Log file isn't local.\n"
-            log += "*** Fetching here: {url}\n".format(**locals())
-            try:
-                import requests
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                response = requests.get(url, timeout=timeout)
-                response.raise_for_status()
-                log += '\n' + response.text
-            except Exception:
-                log += "*** Failed to fetch log file from worker.\n".format(
-                    **locals())
-        return log
-
-    def get_log_relative_dir(self, ti):
-        """
-        Get relative log file directory.
-        :param ti: task instance object
-        """
-        return "{}/{}".format(ti.dag_id, ti.task_id)
-
-    def get_log_relative_path(self, ti):
-        """
-        Get relative log file path.
-        :param ti: task instance object
-        """
-        directory = self.get_log_relative_dir(ti)
-        filename = "{}.log".format(ti.execution_date.isoformat())
-        return os.path.join(directory, filename)
-
-    def get_local_loc(self, ti):
-        """
-        Get full local log path given task instance object.
-        :param ti: task instance object
-        """
-        log_relative_path = self.get_log_relative_path(ti)
-        return os.path.join(self.local_base, log_relative_path)
-
-    def _init_file(self, ti):
-        """
-        Create log directory and give it correct permissions.
-        :param ti: task instance object
-        :return relative log path of the given task instance
-        """
-        # To handle log writing when tasks are impersonated, the log files need to
-        # be writable by the user that runs the Airflow command and the user
-        # that is impersonated. This is mainly to handle corner cases with the
-        # SubDagOperator. When the SubDagOperator is run, all of the operators
-        # run under the impersonated user and create appropriate log files
-        # as the impersonated user. However, if the user manually runs tasks
-        # of the SubDagOperator through the UI, then the log files are created
-        # by the user that runs the Airflow command. For example, the Airflow
-        # run command may be run by the `airflow_sudoable` user, but the Airflow
-        # tasks may be run by the `airflow` user. If the log files are not
-        # writable by both users, then it's possible that re-running a task
-        # via the UI (or vice versa) results in a permission error as the task
-        # tries to write to a log file created by the other user.
-        relative_log_dir = self.get_log_relative_dir(ti)
-        directory = os.path.join(self.local_base, relative_log_dir)
-        # Create the log file and give it group writable permissions
-        # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
-        # operator is not compatible with impersonation (e.g. if a Celery executor is used
-        # for a SubDag operator and the SubDag operator has a different owner than the
-        # parent DAG)
-        if not os.path.exists(directory):
-            # Create the directory as globally writable using custom mkdirs
-            # as os.makedirs doesn't set mode properly.
-            mkdirs(directory, 0o775)
-
-        local_loc = self.get_local_loc(ti)
-
-        if not os.path.exists(local_loc):
-            open(local_loc, "a").close()
-            os.chmod(local_loc, 0o666)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
deleted file mode 100644
index 5c8ad7e..0000000
--- a/airflow/utils/log/s3_task_handler.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import warnings
-
-from airflow import configuration as conf
-from airflow.utils import logging as logging_utils
-from airflow.utils.log.file_task_handler import FileTaskHandler
-
-
-class S3TaskHandler(FileTaskHandler):
-    """
-    S3TaskHandler is a python log handler that handles and reads
-    task instance logs. It extends airflow FileTaskHandler and
-    uploads to and reads from S3 remote storage.
-    """
-
-    def __init__(self, base_log_folder, remote_base_log_folder):
-        super(S3TaskHandler, self).__init__(base_log_folder)
-        self.remote_base = remote_base_log_folder
-        # deprecated as of March 2016
-        if not self.remote_base and conf.get('core', 'S3_LOG_FOLDER'):
-            warnings.warn(
-                'The S3_LOG_FOLDER conf key has been replaced by '
-                'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
-                'update airflow.cfg to ensure future compatibility.',
-                DeprecationWarning)
-            self.remote_base = conf.get('core', 'S3_LOG_FOLDER')
-        self.closed = False
-
-    def set_context(self, task_instance):
-        super(S3TaskHandler, self).set_context(task_instance)
-        # Local location and remote location is needed to open and
-        # upload local log file to S3 remote storage.
-        self.local_loc = self.get_local_loc(task_instance)
-        self.remote_loc = os.path.join(self.remote_base,
-                                       self.get_log_relative_path(task_instance))
-
-    def close(self):
-        """
-        Close and upload local log file to remote storage S3.
-        """
-        # When application exit, system shuts down all handlers by
-        # calling close method. Here we check if logger is already
-        # closed to prevent uploading the log to remote storage multiple
-        # times when `logging.shutdown` is called.
-        if self.closed:
-            return
-
-        super(S3TaskHandler, self).close()
-
-        if os.path.exists(self.local_loc):
-            # read log and remove old logs to get just the latest additions
-            with open(self.local_loc, 'r') as logfile:
-                log = logfile.read()
-            logging_utils.S3Log().write(log, self.remote_loc)
-
-        self.closed = True
-
-    def read(self, task_instance):
-        """
-        Read logs of given task instance from S3 remote storage. If failed,
-        read the log from local machine.
-        :param task_instance: task instance object
-        """
-        log = ""
-        remote_loc = os.path.join(self.remote_base, self.get_log_relative_path(task_instance))
-        # TODO: check if the remote_loc exist first instead of checking
-        # if remote_log here. This logic is going to modified once logs are split
-        # by try_number
-        remote_log = logging_utils.S3Log().read(remote_loc, return_error=True)
-        if remote_log:
-            log += ('*** Reading remote log from {}.\n{}\n'.format(
-                remote_loc, remote_log))
-        else:
-            log = super(S3TaskHandler, self).read(task_instance)
-
-        return log

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 2df3086..96767cb 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -35,20 +35,9 @@ class LoggingMixin(object):
         try:
             return self._logger
         except AttributeError:
-            print(self.__class__.__module__ + '.' + self.__class__.__name__)
             self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__)
             return self._logger
 
-    def set_logger_contexts(self, task_instance):
-        """
-        Set the context for all handlers of the logger.
-        """
-        for handler in self.logger.handlers:
-            try:
-                handler.set_context(task_instance)
-            except AttributeError:
-                pass
-
 
 class S3Log(object):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c429765..6c39462 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,7 +16,6 @@
 from past.builtins import basestring, unicode
 
 import ast
-import logging
 import os
 import pkg_resources
 import socket
@@ -73,10 +72,12 @@ from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import provide_session
 from airflow.utils.helpers import alchemy_to_dict
+from airflow.utils import logging as log_utils
 from airflow.utils.dates import infer_time_unit, scale_time_units
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.www.validators import GreaterEqualThan
+from airflow.configuration import AirflowConfigException
 
 QUERY_LIMIT = 100000
 CHART_LIMIT = 200000
@@ -697,32 +698,99 @@ class Airflow(BaseView):
     @login_required
     @wwwutils.action_logging
     def log(self):
+        BASE_LOG_FOLDER = os.path.expanduser(
+            conf.get('core', 'BASE_LOG_FOLDER'))
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+        dag = dagbag.get_dag(dag_id)
+        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
+            **locals())
+        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
+        loc = loc.format(**locals())
+        log = ""
+        TI = models.TaskInstance
         dttm = dateutil.parser.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
-        dag = dagbag.get_dag(dag_id)
-
         session = Session()
-        ti = session.query(models.TaskInstance).filter(
-            models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id,
-            models.TaskInstance.execution_date == dttm).first()
+        ti = session.query(TI).filter(
+            TI.dag_id == dag_id, TI.task_id == task_id,
+            TI.execution_date == dttm).first()
+
         if ti is None:
             log = "*** Task instance did not exist in the DB\n"
         else:
-            logger = logging.getLogger('airflow.task')
-            task_log_reader = conf.get('core', 'task_log_reader')
-            for handler in logger.handlers:
-                print("handler name is {}".format(handler.name))
-            handler = next((handler for handler in logger.handlers
-                           if handler.name == task_log_reader), None)
-            try:
-                log = handler.read(ti)
-            except AttributeError as e:
-                log = "Task log handler {} does not support read logs.\n".format(
-                    task_log_reader)
-                log += e.message
+            # load remote logs
+            remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+            remote_log_loaded = False
+            if remote_log_base:
+                remote_log_path = os.path.join(remote_log_base, log_relative)
+                remote_log = ""
+
+                # Only display errors reading the log if the task completed or ran at least
+                # once before (otherwise there won't be any remote log stored).
+                ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED}
+                ti_ran_more_than_once = ti.try_number > 1
+                surface_log_retrieval_errors = (
+                    ti_execution_completed or ti_ran_more_than_once)
+
+                # S3
+                if remote_log_path.startswith('s3:/'):
+                    remote_log += log_utils.S3Log().read(
+                        remote_log_path, return_error=surface_log_retrieval_errors)
+                    remote_log_loaded = True
+                # GCS
+                elif remote_log_path.startswith('gs:/'):
+                    remote_log += log_utils.GCSLog().read(
+                        remote_log_path, return_error=surface_log_retrieval_errors)
+                    remote_log_loaded = True
+                # unsupported
+                else:
+                    remote_log += '*** Unsupported remote log location.'
+
+                if remote_log:
+                    log += ('*** Reading remote log from {}.\n{}\n'.format(
+                        remote_log_path, remote_log))
+
+            # We only want to display the
+            # local logs while the task is running if a remote log configuration is set up
+            # since the logs will be transfered there after the run completes.
+            # TODO(aoen): One problem here is that if a task is running on a worker it
+            # already ran on, then duplicate logs will be printed for all of the previous
+            # runs of the task that already completed since they will have been printed as
+            # part of the remote log section above. This can be fixed either by streaming
+            # logs to the log servers as tasks are running, or by creating a proper
+            # abstraction for multiple task instance runs).
+            if not remote_log_loaded or ti.state == State.RUNNING:
+                if os.path.exists(loc):
+                    try:
+                        f = open(loc)
+                        log += "*** Reading local log.\n" + "".join(f.readlines())
+                        f.close()
+                    except:
+                        log = "*** Failed to load local log file: {0}.\n".format(loc)
+                else:
+                    WORKER_LOG_SERVER_PORT = \
+                        conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+                    url = os.path.join(
+                        "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative
+                    ).format(**locals())
+                    log += "*** Log file isn't local.\n"
+                    log += "*** Fetching here: {url}\n".format(**locals())
+                    try:
+                        import requests
+                        timeout = None  # No timeout
+                        try:
+                            timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
+                        except (AirflowConfigException, ValueError):
+                            pass
+
+                        response = requests.get(url, timeout=timeout)
+                        response.raise_for_status()
+                        log += '\n' + response.text
+                    except:
+                        log += "*** Failed to fetch log file from worker.\n".format(
+                            **locals())
 
         if PY2 and not isinstance(log, unicode):
             log = log.decode('utf-8')