You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/29 19:46:38 UTC
incubator-airflow git commit: [AIRFLOW-2402] Fix RBAC task log
Repository: incubator-airflow
Updated Branches:
refs/heads/master 9b661fa61 -> 7c3435442
[AIRFLOW-2402] Fix RBAC task log
Closes #3319 from yrqls21/kevin_yang_fix_rbac_view
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c343544
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c343544
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c343544
Branch: refs/heads/master
Commit: 7c34354427f8c047b2cfb3f472a38bc50fe91d45
Parents: 9b661fa
Author: Kevin Yang <ke...@airbnb.com>
Authored: Tue May 29 20:46:01 2018 +0100
Committer: Kaxil Naik <ka...@apache.org>
Committed: Tue May 29 20:46:01 2018 +0100
----------------------------------------------------------------------
airflow/configuration.py | 4 +-
airflow/www/templates/airflow/ti_log.html | 1 -
airflow/www/views.py | 11 +-
airflow/www_rbac/templates/airflow/ti_log.html | 158 +++++++++++++++----
airflow/www_rbac/views.py | 92 ++++++++---
.../2017-09-01T00.00.00+00.00/1.log | 1 +
.../2017-09-01T00.00.00/1.log | 1 -
tests/www/test_views.py | 21 ++-
.../2017-09-01T00.00.00+00.00/1.log | 1 +
.../2017-09-01T00.00.00/1.log | 1 -
tests/www_rbac/test_views.py | 117 +++++++++++++-
11 files changed, 337 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index e19a8b1..2ee453f 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -346,7 +346,7 @@ class AirflowConfigParser(ConfigParser):
Note: this is not reversible.
"""
# override any custom settings with defaults
- self.defaults.read_string(parameterized_config(DEFAULT_CONFIG))
+ self.read_string(parameterized_config(DEFAULT_CONFIG))
# then read test config
self.read_string(parameterized_config(TEST_CONFIG))
# then read any "custom" test settings
@@ -446,8 +446,10 @@ if not os.path.isfile(AIRFLOW_CONFIG):
log.info("Reading the config from %s", AIRFLOW_CONFIG)
conf = AirflowConfigParser(default_config=parameterized_config(DEFAULT_CONFIG))
+
conf.read(AIRFLOW_CONFIG)
+
if conf.getboolean('webserver', 'rbac'):
with open(os.path.join(_templates_dir, 'default_webserver_config.py')) as f:
DEFAULT_WEBSERVER_CONFIG = f.read()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www/templates/airflow/ti_log.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html
index 2615c45..2b50061 100644
--- a/airflow/www/templates/airflow/ti_log.html
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -41,7 +41,6 @@ limitations under the License.
</div>
{% endblock %}
{% block tail %}
-{{ lib.form_js() }}
{{ super() }}
<script>
// TODO: make those constants configurable.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 4860dbd..eb29b89 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -733,10 +733,12 @@ class Airflow(BaseView):
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
try_number = int(request.args.get('try_number'))
- # metadata may be None
metadata = request.args.get('metadata')
- if metadata:
- metadata = json.loads(metadata)
+ metadata = json.loads(metadata)
+
+ # metadata may be null
+ if not metadata:
+ metadata = {}
# Convert string datetime into actual datetime
try:
@@ -779,9 +781,6 @@ class Airflow(BaseView):
.format(task_log_reader, str(e))]
metadata['end_of_log'] = True
return jsonify(message=error_message, error=True, metadata=metadata)
- except AirflowException as e:
- metadata['end_of_log'] = True
- return jsonify(message=str(e), error=True, metadata=metadata)
@expose('/log')
@login_required
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www_rbac/templates/airflow/ti_log.html
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/templates/airflow/ti_log.html b/airflow/www_rbac/templates/airflow/ti_log.html
index 79aee89..c873f67 100644
--- a/airflow/www_rbac/templates/airflow/ti_log.html
+++ b/airflow/www_rbac/templates/airflow/ti_log.html
@@ -1,40 +1,140 @@
{#
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
#}
{% extends "airflow/task_instance.html" %}
{% block title %}Airflow - DAGs{% endblock %}
{% block content %}
- {{ super() }}
- <h4>{{ title }}</h4>
- <ul class="nav nav-pills" role="tablist">
- {% for log in logs %}
- <li role="presentation" class="{{ 'active' if loop.last else '' }}">
- <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab">
- {{ loop.index }}
- </a>
- </li>
- {% endfor %}
- </ul>
- <div class="tab-content">
- {% for log in logs %}
- <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}">
- <pre id="attempt-{{ loop.index }}">{{ log }}</pre>
- </div>
- {% endfor %}
+{{ super() }}
+<h4>{{ title }}</h4>
+<ul class="nav nav-pills" role="tablist">
+ {% for log in logs %}
+ <li role="presentation" class="{{ 'active' if loop.last else '' }}">
+ <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab">
+ {{ loop.index }}
+ </a>
+ </li>
+ {% endfor %}
+</ul>
+<div class="tab-content">
+ {% for log in logs %}
+ <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}">
+ <img id="loading-{{ loop.index }}" style="margin-top:0%; margin-left:50%; height:50px; width:50px; position: absolute;"
+ alt="spinner" src="{{ url_for('static', filename='loading.gif') }}">
+ <pre><code id="try-{{ loop.index }}">{{ log }}</code></pre>
</div>
+ {% endfor %}
+ </div>
+{% endblock %}
+{% block tail %}
+{{ super() }}
+<script>
+ // TODO: make those constants configurable.
+ // Time interval to wait before next log fetching. Default 2s.
+ const DELAY = 2e3;
+ // Distance away from page bottom to enable auto tailing.
+ const AUTO_TAILING_OFFSET = 30;
+ // Animation speed for auto tailing log display.
+ const ANIMATION_SPEED = 1000;
+ // Total number of tabs to show.
+ const TOTAL_ATTEMPTS = "{{ logs|length }}";
+
+ // Recursively fetch logs from flask endpoint.
+ function recurse(delay=DELAY) {
+ return new Promise((resolve) => setTimeout(resolve, delay));
+ }
+
+ // Enable auto tailing only when users scroll down to the bottom
+ // of the page. This prevent auto tailing the page if users want
+ // to view earlier rendered messages.
+ function checkAutoTailingCondition() {
+ const docHeight = $(document).height();
+ console.debug($(window).scrollTop())
+ console.debug($(window).height())
+ console.debug($(document).height())
+ return $(window).scrollTop() != 0
+ && ($(window).scrollTop() + $(window).height() > docHeight - AUTO_TAILING_OFFSET);
+ }
+
+ // Streaming log with auto-tailing.
+ function autoTailingLog(try_number, metadata=null, auto_tailing=false) {
+ console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ task_id }}, \
+ execution_date: {{ execution_date }}, try_number: " + try_number + ", metadata: " + JSON.stringify(metadata));
+
+ return Promise.resolve(
+ $.ajax({
+ url: "{{ url_for("Airflow.get_logs_with_metadata") }}",
+ data: {
+ dag_id: "{{ dag_id }}",
+ task_id: "{{ task_id }}",
+ execution_date: "{{ execution_date }}",
+ try_number: try_number,
+ metadata: JSON.stringify(metadata),
+ },
+ })).then(res => {
+ // Stop recursive call to backend when error occurs.
+ if (!res) {
+ document.getElementById("loading-"+try_number).style.display = "none";
+ return;
+ }
+ // res.error is a boolean
+ // res.message is the log itself or the error message
+ if (res.error) {
+ if (res.message) {
+ console.error("Error while retrieving log: " + res.message);
+ }
+ document.getElementById("loading-"+try_number).style.display = "none";
+ return;
+ }
+
+ if (res.message) {
+ // Auto scroll window to the end if current window location is near the end.
+ if(auto_tailing && checkAutoTailingCondition()) {
+ var should_scroll = true
+ }
+ // The message may contain HTML, so either have to escape it or write it as text.
+ document.getElementById(`try-${try_number}`).textContent += res.message + "\n";
+ // Auto scroll window to the end if current window location is near the end.
+ if(should_scroll) {
+ $("html, body").animate({ scrollTop: $(document).height() }, ANIMATION_SPEED);
+ }
+ }
+
+ if (res.metadata.end_of_log) {
+ document.getElementById("loading-"+try_number).style.display = "none";
+ return;
+ }
+ return recurse().then(() => autoTailingLog(
+ try_number, res.metadata, auto_tailing));
+ });
+ }
+ $(document).ready(function() {
+ // Lazily load all past task instance logs.
+ // TODO: We only need to have recursive queries for
+ // latest running task instances. Currently it does not
+ // work well with ElasticSearch because ES query only
+ // returns at most 10k documents. We want the ability
+ // to display all logs in the front-end.
+ // An optimization here is to render from latest attempt.
+ for(let i = TOTAL_ATTEMPTS; i >= 1; i--) {
+ // Only auto_tailing the page when streaming the latest attempt.
+ autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS));
+ }
+ });
+
+</script>
{% endblock %}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index f508720..ebd1005 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -38,7 +38,7 @@ from sqlalchemy import or_, desc, and_, union_all
from flask import (
g, redirect, request, Markup, Response, render_template,
- make_response, flash)
+ make_response, flash, jsonify)
from flask._compat import PY2
from flask_appbuilder import BaseView, ModelView, expose, has_access
@@ -417,6 +417,65 @@ class Airflow(AirflowBaseView):
form=form,
title=title, )
+ @expose('/get_logs_with_metadata')
+ @has_access
+ @action_logging
+ @provide_session
+ def get_logs_with_metadata(self, session=None):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ execution_date = request.args.get('execution_date')
+ dttm = pendulum.parse(execution_date)
+ try_number = int(request.args.get('try_number'))
+ metadata = request.args.get('metadata')
+ metadata = json.loads(metadata)
+
+ # metadata may be null
+ if not metadata:
+ metadata = {}
+
+ # Convert string datetime into actual datetime
+ try:
+ execution_date = timezone.parse(execution_date)
+ except ValueError:
+ error_message = (
+ 'Given execution date, {}, could not be identified '
+ 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
+ execution_date))
+ response = jsonify({'error': error_message})
+ response.status_code = 400
+
+ return response
+
+ logger = logging.getLogger('airflow.task')
+ task_log_reader = conf.get('core', 'task_log_reader')
+ handler = next((handler for handler in logger.handlers
+ if handler.name == task_log_reader), None)
+
+ ti = session.query(models.TaskInstance).filter(
+ models.TaskInstance.dag_id == dag_id,
+ models.TaskInstance.task_id == task_id,
+ models.TaskInstance.execution_date == dttm).first()
+ try:
+ if ti is None:
+ logs = ["*** Task instance did not exist in the DB\n"]
+ metadata['end_of_log'] = True
+ else:
+ dag = dagbag.get_dag(dag_id)
+ ti.task = dag.get_task(ti.task_id)
+ logs, metadatas = handler.read(ti, try_number, metadata=metadata)
+ metadata = metadatas[0]
+ for i, log in enumerate(logs):
+ if PY2 and not isinstance(log, unicode):
+ logs[i] = log.decode('utf-8')
+ message = logs[0]
+ return jsonify(message=message, metadata=metadata)
+ except AttributeError as e:
+ error_message = ["Task log handler {} does not support read logs.\n{}\n"
+ .format(task_log_reader, str(e))]
+ metadata['end_of_log'] = True
+ return jsonify(message=error_message, error=True, metadata=metadata)
+
@expose('/log')
@has_access
@action_logging
@@ -428,34 +487,17 @@ class Airflow(AirflowBaseView):
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
- ti = (
- session.query(models.TaskInstance)
- .filter(models.TaskInstance.dag_id == dag_id,
- models.TaskInstance.task_id == task_id,
- models.TaskInstance.execution_date == dttm)
- .first()
- )
- if ti is None:
- logs = ["*** Task instance did not exist in the DB\n"]
- else:
- logger = logging.getLogger('airflow.task')
- task_log_reader = conf.get('core', 'task_log_reader')
- handler = next((handler for handler in logger.handlers
- if handler.name == task_log_reader), None)
- try:
- ti.task = dag.get_task(ti.task_id)
- logs = handler.read(ti)
- except AttributeError as e:
- logs = ["Task log handler {} does not support read logs.\n{}\n"
- .format(task_log_reader, str(e))]
- for i, log in enumerate(logs):
- if PY2 and not isinstance(log, unicode):
- logs[i] = log.decode('utf-8')
+ ti = session.query(models.TaskInstance).filter(
+ models.TaskInstance.dag_id == dag_id,
+ models.TaskInstance.task_id == task_id,
+ models.TaskInstance.execution_date == dttm).first()
+ logs = [''] * (ti.next_try_number - 1 if ti is not None else 0)
return self.render(
'airflow/ti_log.html',
- logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
+ logs=logs, dag=dag, title="Log by attempts",
+ dag_id=dag.dag_id, task_id=task_id,
execution_date=execution_date, form=form)
@expose('/task')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
new file mode 100644
index 0000000..bc10ef7
--- /dev/null
+++ b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
@@ -0,0 +1 @@
+Log for testing.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
deleted file mode 100644
index bc10ef7..0000000
--- a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
+++ /dev/null
@@ -1 +0,0 @@
-Log for testing.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 3b2892d..08d72be 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -364,7 +364,6 @@ class TestLogView(unittest.TestCase):
def tearDown(self):
logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
- dagbag = models.DagBag(settings.DAGS_FOLDER)
self.session.query(TaskInstance).filter(
TaskInstance.dag_id == self.DAG_ID and
TaskInstance.task_id == self.TASK_ID and
@@ -400,6 +399,22 @@ class TestLogView(unittest.TestCase):
self.assertIn('"message":', response.data.decode('utf-8'))
self.assertIn('"metadata":', response.data.decode('utf-8'))
+ self.assertIn('Log for testing.', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
+ def test_get_logs_with_null_metadata(self):
+ url_template = "/admin/airflow/get_logs_with_metadata?dag_id={}&" \
+ "task_id={}&execution_date={}&" \
+ "try_number={}&metadata=null"
+ response = \
+ self.app.get(url_template.format(self.DAG_ID,
+ self.TASK_ID,
+ quote_plus(self.DEFAULT_DATE.isoformat()),
+ 1))
+
+ self.assertIn('"message":', response.data.decode('utf-8'))
+ self.assertIn('"metadata":', response.data.decode('utf-8'))
+ self.assertIn('Log for testing.', response.data.decode('utf-8'))
self.assertEqual(200, response.status_code)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
new file mode 100644
index 0000000..bc10ef7
--- /dev/null
+++ b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log
@@ -0,0 +1 @@
+Log for testing.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
deleted file mode 100644
index bc10ef7..0000000
--- a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log
+++ /dev/null
@@ -1 +0,0 @@
-Log for testing.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index bbec4ac..f6fb24b 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,14 +17,27 @@
# specific language governing permissions and limitations
# under the License.
+import copy
import io
+import json
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
import unittest
import urllib
-from werkzeug.test import Client
+
from flask._compat import PY2
from flask_appbuilder.security.sqla.models import User as ab_user
-from airflow import models
+from urllib.parse import quote_plus
+from werkzeug.test import Client
+
from airflow import configuration as conf
+from airflow import models
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.state import State
@@ -400,6 +413,102 @@ class TestConfigurationView(TestBase):
['Airflow Configuration', 'Running Configuration'], resp)
+class TestLogView(TestBase):
+ DAG_ID = 'dag_for_testing_log_view'
+ TASK_ID = 'task_for_testing_log_view'
+ DEFAULT_DATE = timezone.datetime(2017, 9, 1)
+ ENDPOINT = 'log?dag_id={dag_id}&task_id={task_id}&' \
+ 'execution_date={execution_date}'.format(dag_id=DAG_ID,
+ task_id=TASK_ID,
+ execution_date=DEFAULT_DATE)
+
+ def setUp(self):
+ conf.load_test_config()
+
+ # Create a custom logging configuration
+ logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ logging_config['handlers']['task']['base_log_folder'] = os.path.normpath(
+ os.path.join(current_dir, 'test_logs'))
+ logging_config['handlers']['task']['filename_template'] = \
+ '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+ '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+ # Write the custom logging configuration to a file
+ self.settings_folder = tempfile.mkdtemp()
+ settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+ new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+ with open(settings_file, 'w') as handle:
+ handle.writelines(new_logging_file)
+ sys.path.append(self.settings_folder)
+ conf.set('core', 'logging_config_class', 'airflow_local_settings.LOGGING_CONFIG')
+
+ self.app, self.appbuilder = application.create_app(testing=True)
+ self.app.config['WTF_CSRF_ENABLED'] = False
+ self.client = self.app.test_client()
+ self.login()
+ self.session = Session()
+
+ from airflow.www_rbac.views import dagbag
+ dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE)
+ task = DummyOperator(task_id=self.TASK_ID, dag=dag)
+ dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+ ti = TaskInstance(task=task, execution_date=self.DEFAULT_DATE)
+ ti.try_number = 1
+ self.session.merge(ti)
+ self.session.commit()
+
+ def tearDown(self):
+ logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+ self.clear_table(TaskInstance)
+
+ shutil.rmtree(self.settings_folder)
+ conf.set('core', 'logging_config_class', '')
+
+ self.logout()
+ super(TestLogView, self).tearDown()
+
+ def test_get_file_task_log(self):
+ response = self.client.get(
+ TestLogView.ENDPOINT,
+ follow_redirects=True,
+ )
+ self.assertEqual(response.status_code, 200)
+ self.assertIn('Log by attempts',
+ response.data.decode('utf-8'))
+
+ def test_get_logs_with_metadata(self):
+ url_template = "get_logs_with_metadata?dag_id={}&" \
+ "task_id={}&execution_date={}&" \
+ "try_number={}&metadata={}"
+ response = \
+ self.client.get(url_template.format(self.DAG_ID,
+ self.TASK_ID,
+ quote_plus(self.DEFAULT_DATE.isoformat()),
+ 1,
+ json.dumps({})), follow_redirects=True)
+
+ self.assertIn('"message":', response.data.decode('utf-8'))
+ self.assertIn('"metadata":', response.data.decode('utf-8'))
+ self.assertIn('Log for testing.', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
+ def test_get_logs_with_null_metadata(self):
+ url_template = "get_logs_with_metadata?dag_id={}&" \
+ "task_id={}&execution_date={}&" \
+ "try_number={}&metadata=null"
+ response = \
+ self.client.get(url_template.format(self.DAG_ID,
+ self.TASK_ID,
+ quote_plus(self.DEFAULT_DATE.isoformat()),
+ 1), follow_redirects=True)
+
+ self.assertIn('"message":', response.data.decode('utf-8'))
+ self.assertIn('"metadata":', response.data.decode('utf-8'))
+ self.assertIn('Log for testing.', response.data.decode('utf-8'))
+ self.assertEqual(200, response.status_code)
+
+
class TestVersionView(TestBase):
def test_version(self):
resp = self.client.get('version', follow_redirects=True)