You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/29 19:46:38 UTC

incubator-airflow git commit: [AIRFLOW-2402] Fix RBAC task log

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9b661fa61 -> 7c3435442


[AIRFLOW-2402] Fix RBAC task log

Closes #3319 from yrqls21/kevin_yang_fix_rbac_view


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c343544
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c343544
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c343544

Branch: refs/heads/master
Commit: 7c34354427f8c047b2cfb3f472a38bc50fe91d45
Parents: 9b661fa
Author: Kevin Yang <ke...@airbnb.com>
Authored: Tue May 29 20:46:01 2018 +0100
Committer: Kaxil Naik <ka...@apache.org>
Committed: Tue May 29 20:46:01 2018 +0100

----------------------------------------------------------------------
 airflow/configuration.py                        |   4 +-
 airflow/www/templates/airflow/ti_log.html       |   1 -
 airflow/www/views.py                            |  11 +-
 airflow/www_rbac/templates/airflow/ti_log.html  | 158 +++++++++++++++----
 airflow/www_rbac/views.py                       |  92 ++++++++---
 .../2017-09-01T00.00.00+00.00/1.log             |   1 +
 .../2017-09-01T00.00.00/1.log                   |   1 -
 tests/www/test_views.py                         |  21 ++-
 .../2017-09-01T00.00.00+00.00/1.log             |   1 +
 .../2017-09-01T00.00.00/1.log                   |   1 -
 tests/www_rbac/test_views.py                    | 117 +++++++++++++-
 11 files changed, 337 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index e19a8b1..2ee453f 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -346,7 +346,7 @@ class AirflowConfigParser(ConfigParser):
         Note: this is not reversible.
         """
         # override any custom settings with defaults
-        self.defaults.read_string(parameterized_config(DEFAULT_CONFIG))
+        self.read_string(parameterized_config(DEFAULT_CONFIG))
         # then read test config
         self.read_string(parameterized_config(TEST_CONFIG))
         # then read any "custom" test settings
@@ -446,8 +446,10 @@ if not os.path.isfile(AIRFLOW_CONFIG):
 log.info("Reading the config from %s", AIRFLOW_CONFIG)
 
 conf = AirflowConfigParser(default_config=parameterized_config(DEFAULT_CONFIG))
+
 conf.read(AIRFLOW_CONFIG)
 
+
 if conf.getboolean('webserver', 'rbac'):
     with open(os.path.join(_templates_dir, 'default_webserver_config.py')) as f:
         DEFAULT_WEBSERVER_CONFIG = f.read()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www/templates/airflow/ti_log.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html
index 2615c45..2b50061 100644
--- a/airflow/www/templates/airflow/ti_log.html
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -41,7 +41,6 @@ limitations under the License.
   </div>
 {% endblock %}
 {% block tail %}
-{{ lib.form_js() }}
 {{ super() }}
 <script>
     // TODO: make those constants configurable.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 4860dbd..eb29b89 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -733,10 +733,12 @@ class Airflow(BaseView):
         execution_date = request.args.get('execution_date')
         dttm = pendulum.parse(execution_date)
         try_number = int(request.args.get('try_number'))
-        # metadata may be None
         metadata = request.args.get('metadata')
-        if metadata:
-            metadata = json.loads(metadata)
+        metadata = json.loads(metadata)
+
+        # metadata may be null
+        if not metadata:
+            metadata = {}
 
         # Convert string datetime into actual datetime
         try:
@@ -779,9 +781,6 @@ class Airflow(BaseView):
                              .format(task_log_reader, str(e))]
             metadata['end_of_log'] = True
             return jsonify(message=error_message, error=True, metadata=metadata)
-        except AirflowException as e:
-            metadata['end_of_log'] = True
-            return jsonify(message=str(e), error=True, metadata=metadata)
 
     @expose('/log')
     @login_required

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www_rbac/templates/airflow/ti_log.html
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/templates/airflow/ti_log.html b/airflow/www_rbac/templates/airflow/ti_log.html
index 79aee89..c873f67 100644
--- a/airflow/www_rbac/templates/airflow/ti_log.html
+++ b/airflow/www_rbac/templates/airflow/ti_log.html
@@ -1,40 +1,140 @@
 {#
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 
 #}
 {% extends "airflow/task_instance.html" %}
 {% block title %}Airflow - DAGs{% endblock %}
 
 {% block content %}
-  {{ super() }}
-  <h4>{{ title }}</h4>
-  <ul class="nav nav-pills" role="tablist">
-    {% for log in logs %}
-      <li role="presentation" class="{{ 'active' if loop.last else '' }}">
-        <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab">
-          {{ loop.index }}
-        </a>
-      </li>
-    {% endfor %}
-  </ul>
-  <div class="tab-content">
-    {% for log in logs %}
-      <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}">
-        <pre id="attempt-{{ loop.index }}">{{ log }}</pre>
-      </div>
-    {% endfor %}
+{{ super() }}
+<h4>{{ title }}</h4>
+<ul class="nav nav-pills" role="tablist">
+  {% for log in logs %}
+  <li role="presentation" class="{{ 'active' if loop.last else '' }}">
+    <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab">
+      {{ loop.index }}
+    </a>
+  </li>
+  {% endfor %}
+</ul>
+<div class="tab-content">
+  {% for log in logs %}
+  <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}">
+    <img id="loading-{{ loop.index }}" style="margin-top:0%; margin-left:50%; height:50px; width:50px; position: absolute;"
+         alt="spinner" src="{{ url_for('static', filename='loading.gif') }}">
+    <pre><code id="try-{{ loop.index }}">{{ log }}</code></pre>
   </div>
+  {% endfor %}
+  </div>
+{% endblock %}
+{% block tail %}
+{{ super() }}
+<script>
+    // TODO: make those constants configurable.
+    // Time interval to wait before next log fetching. Default 2s.
+    const DELAY = 2e3;
+    // Distance away from page bottom to enable auto tailing.
+    const AUTO_TAILING_OFFSET = 30;
+    // Animation speed for auto tailing log display.
+    const ANIMATION_SPEED = 1000;
+    // Total number of tabs to show.
+    const TOTAL_ATTEMPTS = "{{ logs|length }}";
+
+    // Recursively fetch logs from flask endpoint.
+    function recurse(delay=DELAY) {
+      return new Promise((resolve) => setTimeout(resolve, delay));
+    }
+
+    // Enable auto tailing only when users scroll down to the bottom
+    // of the page. This prevent auto tailing the page if users want
+    // to view earlier rendered messages.
+    function checkAutoTailingCondition() {
+      const docHeight = $(document).height();
+      console.debug($(window).scrollTop())
+      console.debug($(window).height())
+      console.debug($(document).height())
+      return $(window).scrollTop() != 0
+             && ($(window).scrollTop() + $(window).height() > docHeight - AUTO_TAILING_OFFSET);
+    }
+
+    // Streaming log with auto-tailing.
+    function autoTailingLog(try_number, metadata=null, auto_tailing=false) {
+      console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ task_id }}, \
+       execution_date: {{ execution_date }}, try_number: " + try_number + ", metadata: " + JSON.stringify(metadata));
+
+      return Promise.resolve(
+        $.ajax({
+          url: "{{ url_for("Airflow.get_logs_with_metadata") }}",
+          data: {
+            dag_id: "{{ dag_id }}",
+            task_id: "{{ task_id }}",
+            execution_date: "{{ execution_date }}",
+            try_number: try_number,
+            metadata: JSON.stringify(metadata),
+          },
+        })).then(res => {
+          // Stop recursive call to backend when error occurs.
+          if (!res) {
+            document.getElementById("loading-"+try_number).style.display = "none";
+            return;
+          }
+          // res.error is a boolean
+          // res.message is the log itself or the error message
+          if (res.error) {
+            if (res.message) {
+              console.error("Error while retrieving log: " + res.message);
+            }
+            document.getElementById("loading-"+try_number).style.display = "none";
+            return;
+          }
+
+          if (res.message) {
+            // Auto scroll window to the end if current window location is near the end.
+            if(auto_tailing && checkAutoTailingCondition()) {
+              var should_scroll = true
+            }
+            // The message may contain HTML, so either have to escape it or write it as text.
+            document.getElementById(`try-${try_number}`).textContent += res.message + "\n";
+            // Auto scroll window to the end if current window location is near the end.
+            if(should_scroll) {
+              $("html, body").animate({ scrollTop: $(document).height() }, ANIMATION_SPEED);
+            }
+          }
+
+          if (res.metadata.end_of_log) {
+            document.getElementById("loading-"+try_number).style.display = "none";
+            return;
+          }
+          return recurse().then(() => autoTailingLog(
+            try_number, res.metadata, auto_tailing));
+        });
+    }
+    $(document).ready(function() {
+      // Lazily load all past task instance logs.
+      // TODO: We only need to have recursive queries for
+      // latest running task instances. Currently it does not
+      // work well with ElasticSearch because ES query only
+      // returns at most 10k documents. We want the ability
+      // to display all logs in the front-end.
+      // An optimization here is to render from latest attempt.
+      for(let i = TOTAL_ATTEMPTS; i >= 1; i--) {
+        // Only auto_tailing the page when streaming the latest attempt.
+        autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS));
+      }
+    });
+
+</script>
 {% endblock %}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index f508720..ebd1005 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -38,7 +38,7 @@ from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
     g, redirect, request, Markup, Response, render_template,
-    make_response, flash)
+    make_response, flash, jsonify)
 from flask._compat import PY2
 
 from flask_appbuilder import BaseView, ModelView, expose, has_access
@@ -417,6 +417,65 @@ class Airflow(AirflowBaseView):
             form=form,
             title=title, )
 
+    @expose('/get_logs_with_metadata')
+    @has_access
+    @action_logging
+    @provide_session
+    def get_logs_with_metadata(self, session=None):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = pendulum.parse(execution_date)
+        try_number = int(request.args.get('try_number'))
+        metadata = request.args.get('metadata')
+        metadata = json.loads(metadata)
+
+        # metadata may be null
+        if not metadata:
+            metadata = {}
+
+        # Convert string datetime into actual datetime
+        try:
+            execution_date = timezone.parse(execution_date)
+        except ValueError:
+            error_message = (
+                'Given execution date, {}, could not be identified '
+                'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
+                    execution_date))
+            response = jsonify({'error': error_message})
+            response.status_code = 400
+
+            return response
+
+        logger = logging.getLogger('airflow.task')
+        task_log_reader = conf.get('core', 'task_log_reader')
+        handler = next((handler for handler in logger.handlers
+                        if handler.name == task_log_reader), None)
+
+        ti = session.query(models.TaskInstance).filter(
+            models.TaskInstance.dag_id == dag_id,
+            models.TaskInstance.task_id == task_id,
+            models.TaskInstance.execution_date == dttm).first()
+        try:
+            if ti is None:
+                logs = ["*** Task instance did not exist in the DB\n"]
+                metadata['end_of_log'] = True
+            else:
+                dag = dagbag.get_dag(dag_id)
+                ti.task = dag.get_task(ti.task_id)
+                logs, metadatas = handler.read(ti, try_number, metadata=metadata)
+                metadata = metadatas[0]
+            for i, log in enumerate(logs):
+                if PY2 and not isinstance(log, unicode):
+                    logs[i] = log.decode('utf-8')
+            message = logs[0]
+            return jsonify(message=message, metadata=metadata)
+        except AttributeError as e:
+            error_message = ["Task log handler {} does not support read logs.\n{}\n"
+                             .format(task_log_reader, str(e))]
+            metadata['end_of_log'] = True
+            return jsonify(message=error_message, error=True, metadata=metadata)
+
     @expose('/log')
     @has_access
     @action_logging
@@ -428,34 +487,17 @@ class Airflow(AirflowBaseView):
         dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
-        ti = (
-            session.query(models.TaskInstance)
-                   .filter(models.TaskInstance.dag_id == dag_id,
-                           models.TaskInstance.task_id == task_id,
-                           models.TaskInstance.execution_date == dttm)
-                   .first()
-        )
-        if ti is None:
-            logs = ["*** Task instance did not exist in the DB\n"]
-        else:
-            logger = logging.getLogger('airflow.task')
-            task_log_reader = conf.get('core', 'task_log_reader')
-            handler = next((handler for handler in logger.handlers
-                            if handler.name == task_log_reader), None)
-            try:
-                ti.task = dag.get_task(ti.task_id)
-                logs = handler.read(ti)
-            except AttributeError as e:
-                logs = ["Task log handler {} does not support read logs.\n{}\n"
-                        .format(task_log_reader, str(e))]
 
-        for i, log in enumerate(logs):
-            if PY2 and not isinstance(log, unicode):
-                logs[i] = log.decode('utf-8')
+        ti = session.query(models.TaskInstance).filter(
+            models.TaskInstance.dag_id == dag_id,
+            models.TaskInstance.task_id == task_id,
+            models.TaskInstance.execution_date == dttm).first()
 
+        logs = [''] * (ti.next_try_number - 1 if ti is not None else 0)
         return self.render(
             'airflow/ti_log.html',
-            logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
+            logs=logs, dag=dag, title="Log by attempts",
+            dag_id=dag.dag_id, task_id=task_id,
             execution_date=execution_date, form=form)
 
     @expose('/task')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
new file mode 100644
index 0000000..bc10ef7
--- /dev/null
+++ b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
@@ -0,0 +1 @@
+Log for testing.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
deleted file mode 100644
index bc10ef7..0000000
--- a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
+++ /dev/null
@@ -1 +0,0 @@
-Log for testing.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 3b2892d..08d72be 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -364,7 +364,6 @@ class TestLogView(unittest.TestCase):
 
     def tearDown(self):
         logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
-        dagbag = models.DagBag(settings.DAGS_FOLDER)
         self.session.query(TaskInstance).filter(
             TaskInstance.dag_id == self.DAG_ID and
             TaskInstance.task_id == self.TASK_ID and
@@ -400,6 +399,22 @@ class TestLogView(unittest.TestCase):
 
         self.assertIn('"message":', response.data.decode('utf-8'))
         self.assertIn('"metadata":', response.data.decode('utf-8'))
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
+    def test_get_logs_with_null_metadata(self):
+        url_template = "/admin/airflow/get_logs_with_metadata?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}&metadata=null"
+        response = \
+            self.app.get(url_template.format(self.DAG_ID,
+                                             self.TASK_ID,
+                                             quote_plus(self.DEFAULT_DATE.isoformat()),
+                                             1))
+
+        self.assertIn('"message":', response.data.decode('utf-8'))
+        self.assertIn('"metadata":', response.data.decode('utf-8'))
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
         self.assertEqual(200, response.status_code)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
new file mode 100644
index 0000000..bc10ef7
--- /dev/null
+++ b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
@@ -0,0 +1 @@
+Log for testing.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
deleted file mode 100644
index bc10ef7..0000000
--- a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
+++ /dev/null
@@ -1 +0,0 @@
-Log for testing.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index bbec4ac..f6fb24b 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,14 +17,27 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import copy
 import io
+import json
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
 import urllib
-from werkzeug.test import Client
+
 from flask._compat import PY2
 from flask_appbuilder.security.sqla.models import User as ab_user
-from airflow import models
+from urllib.parse import quote_plus
+from werkzeug.test import Client
+
 from airflow import configuration as conf
+from airflow import models
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
 from airflow.utils import timezone
 from airflow.utils.state import State
@@ -400,6 +413,102 @@ class TestConfigurationView(TestBase):
             ['Airflow Configuration', 'Running Configuration'], resp)
 
 
+class TestLogView(TestBase):
+    DAG_ID = 'dag_for_testing_log_view'
+    TASK_ID = 'task_for_testing_log_view'
+    DEFAULT_DATE = timezone.datetime(2017, 9, 1)
+    ENDPOINT = 'log?dag_id={dag_id}&task_id={task_id}&' \
+               'execution_date={execution_date}'.format(dag_id=DAG_ID,
+                                                        task_id=TASK_ID,
+                                                        execution_date=DEFAULT_DATE)
+
+    def setUp(self):
+        conf.load_test_config()
+
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        current_dir = os.path.dirname(os.path.abspath(__file__))
+        logging_config['handlers']['task']['base_log_folder'] = os.path.normpath(
+            os.path.join(current_dir, 'test_logs'))
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+        conf.set('core', 'logging_config_class', 'airflow_local_settings.LOGGING_CONFIG')
+
+        self.app, self.appbuilder = application.create_app(testing=True)
+        self.app.config['WTF_CSRF_ENABLED'] = False
+        self.client = self.app.test_client()
+        self.login()
+        self.session = Session()
+
+        from airflow.www_rbac.views import dagbag
+        dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE)
+        task = DummyOperator(task_id=self.TASK_ID, dag=dag)
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        ti = TaskInstance(task=task, execution_date=self.DEFAULT_DATE)
+        ti.try_number = 1
+        self.session.merge(ti)
+        self.session.commit()
+
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        self.clear_table(TaskInstance)
+
+        shutil.rmtree(self.settings_folder)
+        conf.set('core', 'logging_config_class', '')
+
+        self.logout()
+        super(TestLogView, self).tearDown()
+
+    def test_get_file_task_log(self):
+        response = self.client.get(
+            TestLogView.ENDPOINT,
+            follow_redirects=True,
+        )
+        self.assertEqual(response.status_code, 200)
+        self.assertIn('Log by attempts',
+                      response.data.decode('utf-8'))
+
+    def test_get_logs_with_metadata(self):
+        url_template = "get_logs_with_metadata?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}&metadata={}"
+        response = \
+            self.client.get(url_template.format(self.DAG_ID,
+                                                self.TASK_ID,
+                                                quote_plus(self.DEFAULT_DATE.isoformat()),
+                                                1,
+                                                json.dumps({})), follow_redirects=True)
+
+        self.assertIn('"message":', response.data.decode('utf-8'))
+        self.assertIn('"metadata":', response.data.decode('utf-8'))
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
+    def test_get_logs_with_null_metadata(self):
+        url_template = "get_logs_with_metadata?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}&metadata=null"
+        response = \
+            self.client.get(url_template.format(self.DAG_ID,
+                                                self.TASK_ID,
+                                                quote_plus(self.DEFAULT_DATE.isoformat()),
+                                                1), follow_redirects=True)
+
+        self.assertIn('"message":', response.data.decode('utf-8'))
+        self.assertIn('"metadata":', response.data.decode('utf-8'))
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+        self.assertEqual(200, response.status_code)
+
+
 class TestVersionView(TestBase):
     def test_version(self):
         resp = self.client.get('version', follow_redirects=True)