You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/04 04:41:00 UTC

[GitHub] feng-tao closed pull request #4425: [AIRFLOW-3623] Support download logs by attempts from UI

feng-tao closed pull request #4425: [AIRFLOW-3623] Support download logs by attempts from UI
URL: https://github.com/apache/incubator-airflow/pull/4425
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 5f8c88879c..185503e8e1 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -297,3 +297,23 @@ def parse_template_string(template_string):
         return None, Template(template_string)
     else:
         return template_string, None
+
+
+def render_log_filename(ti, try_number, filename_template):
+    """
+    Given task instance, try_number, filename_template, return the rendered log filename
+
+    :param ti: task instance
+    :param try_number: try_number of the task
+    :param filename_template: filename template, which can be jinja template or python string template
+    """
+    filename_template, filename_jinja_template = parse_template_string(filename_template)
+    if filename_jinja_template:
+        jinja_context = ti.get_template_context()
+        jinja_context['try_number'] = try_number
+        return filename_jinja_template.render(**jinja_context)
+
+    return filename_template.format(dag_id=ti.dag_id,
+                                    task_id=ti.task_id,
+                                    execution_date=ti.execution_date.isoformat(),
+                                    try_number=try_number)
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 664a6f3120..3408de7dc8 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -146,6 +146,12 @@ <h4 class="modal-title" id="myModalLabel">
             View Log
           </button>
           <hr/>
+          <div>
+            <label style="display:inline"> Download Log (by attempts): </label>
+            <ul class="nav nav-pills" role="tablist" id="try_index" style="display:inline">
+            </ul>
+          </div>
+          <hr/>
           <button id="btn_run" type="button" class="btn btn-primary"
             title="Runs a single task instance">
             Run
@@ -302,7 +308,7 @@ <h4 class="modal-title" id="dagModalLabel">
     var task_id = '';
     var exection_date = '';
     var subdag_id = '';
-    function call_modal(t, d, sd) {
+    function call_modal(t, d, try_numbers, sd) {
       task_id = t;
       loc = String(window.location);
       $("#btn_filter").on("click", function(){
@@ -314,12 +320,35 @@ <h4 class="modal-title" id="dagModalLabel">
       $('#execution_date').html(d);
       $('#myModal').modal({});
       $("#myModal").css("margin-top","0px")
-        if (subdag_id===undefined)
-            $("#div_btn_subdag").hide();
-        else {
-            $("#div_btn_subdag").show();
-            subdag_id = "{{ dag.dag_id }}."+t;
+      if (subdag_id === undefined)
+          $("#div_btn_subdag").hide();
+      else {
+          $("#div_btn_subdag").show();
+          subdag_id = "{{ dag.dag_id }}."+t;
+      }
+
+      $("#try_index > li").remove();
+      var startIndex = (try_numbers > 2 ? 0 : 1);
+      for (var index = startIndex; index <  try_numbers; index++) {
+        var url = "{{ url_for('airflow.get_logs_with_metadata') }}" +
+          "?dag_id=" + encodeURIComponent(dag_id) +
+          "&task_id=" + encodeURIComponent(task_id) +
+          "&execution_date=" + encodeURIComponent(execution_date) +
+          "&metadata=null" +
+          "&format=file";
+
+        var showLabel = index;
+        if (index != 0) {
+          url += "&try_number=" + index;
+        } else {
+          showLabel = 'All';
         }
+
+        $("#try_index").append(`<li role="presentation" style="display:inline">
+          <a href="${url}"> ${showLabel} </a>
+          </li>`
+        );
+      }
     }
 
     function call_modal_dag(dag) {
@@ -355,7 +384,8 @@ <h4 class="modal-title" id="dagModalLabel">
       url = "{{ url_for('airflow.log') }}" +
         "?task_id=" + encodeURIComponent(task_id) +
         "&dag_id=" + encodeURIComponent(dag_id) +
-        "&execution_date=" + encodeURIComponent(execution_date);
+        "&execution_date=" + encodeURIComponent(execution_date) +
+        "&format=json";
       window.location = url;
     });
 
diff --git a/airflow/www/templates/airflow/tree.html b/airflow/www/templates/airflow/tree.html
index f20127ceb3..abdc337c83 100644
--- a/airflow/www/templates/airflow/tree.html
+++ b/airflow/www/templates/airflow/tree.html
@@ -233,9 +233,9 @@
         if(d.task_id === undefined)
             call_modal_dag(d);
         else if(nodeobj[d.task_id].operator=='SubDagOperator')
-            call_modal(d.task_id, d.execution_date, true);
+            call_modal(d.task_id, d.execution_date, d.try_number, true);
         else
-            call_modal(d.task_id, d.execution_date);
+            call_modal(d.task_id, d.execution_date, d.try_number);
       })
       .attr("class", function(d) {return "state " + d.state})
       .attr("data-toggle", "tooltip")
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bc8e392b3a..c3866c04c7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -40,7 +40,7 @@
 import sqlalchemy as sqla
 from flask import (
     abort, jsonify, redirect, url_for, request, Markup, Response,
-    current_app, render_template, make_response)
+    current_app, render_template, make_response, send_file)
 from flask import flash
 from flask._compat import PY2
 from flask_admin import BaseView, expose, AdminIndexView
@@ -75,7 +75,7 @@
 from airflow.utils import timezone
 from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date
 from airflow.utils.db import create_session, provide_session
-from airflow.utils.helpers import alchemy_to_dict
+from airflow.utils.helpers import alchemy_to_dict, render_log_filename
 from airflow.utils.json import json_ser
 from airflow.utils.net import get_hostname
 from airflow.utils.state import State
@@ -84,6 +84,10 @@
 from airflow.www.forms import (DateTimeForm, DateTimeWithNumRunsForm,
                                DateTimeWithNumRunsWithDagRunsForm)
 from airflow.www.validators import GreaterEqualThan
+if PY2:
+    from cStringIO import StringIO
+else:
+    from io import StringIO
 
 QUERY_LIMIT = 100000
 CHART_LIMIT = 200000
@@ -770,7 +774,12 @@ def get_logs_with_metadata(self, session=None):
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
         dttm = pendulum.parse(execution_date)
-        try_number = int(request.args.get('try_number'))
+        if request.args.get('try_number') is not None:
+            try_number = int(request.args.get('try_number'))
+        else:
+            try_number = None
+        response_format = request.args.get('format', 'json')
+
         metadata = request.args.get('metadata')
         metadata = json.loads(metadata)
 
@@ -812,8 +821,16 @@ def get_logs_with_metadata(self, session=None):
             for i, log in enumerate(logs):
                 if PY2 and not isinstance(log, unicode):
                     logs[i] = log.decode('utf-8')
-            message = logs[0]
-            return jsonify(message=message, metadata=metadata)
+
+            if response_format == 'json':
+                message = logs[0] if try_number is not None else logs
+                return jsonify(message=message, metadata=metadata)
+
+            file_obj = StringIO('\n'.join(logs))
+            filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE')
+            attachment_filename = render_log_filename(ti, try_number, filename_template)
+            return send_file(file_obj, as_attachment=True,
+                             attachment_filename=attachment_filename)
         except AttributeError as e:
             error_message = ["Task log handler {} does not support read logs.\n{}\n"
                              .format(task_log_reader, str(e))]
diff --git a/airflow/www_rbac/templates/airflow/dag.html b/airflow/www_rbac/templates/airflow/dag.html
index 69b16a7666..be3243d8bd 100644
--- a/airflow/www_rbac/templates/airflow/dag.html
+++ b/airflow/www_rbac/templates/airflow/dag.html
@@ -146,6 +146,12 @@ <h4 class="modal-title" id="myModalLabel">
             View Log
           </button>
           <hr/>
+          <div>
+            <label style="display:inline"> Download Log (by attempts): </label>
+            <ul class="nav nav-pills" role="tablist" id="try_index" style="display:inline">
+            </ul>
+          </div>
+          <hr/>
           <button id="btn_run" type="button" class="btn btn-primary"
             title="Runs a single task instance">
             Run
@@ -302,7 +308,7 @@ <h4 class="modal-title" id="dagModalLabel">
     var task_id = '';
     var exection_date = '';
     var subdag_id = '';
-    function call_modal(t, d, sd) {
+    function call_modal(t, d, try_numbers, sd) {
       task_id = t;
       loc = String(window.location);
       $("#btn_filter").on("click", function(){
@@ -314,12 +320,35 @@ <h4 class="modal-title" id="dagModalLabel">
       $('#execution_date').html(d);
       $('#myModal').modal({});
       $("#myModal").css("margin-top","0px")
-        if (subdag_id===undefined)
-            $("#div_btn_subdag").hide();
-        else {
-            $("#div_btn_subdag").show();
-            subdag_id = "{{ dag.dag_id }}."+t;
+      if (subdag_id === undefined)
+          $("#div_btn_subdag").hide();
+      else {
+          $("#div_btn_subdag").show();
+          subdag_id = "{{ dag.dag_id }}."+t;
+      }
+
+      $("#try_index > li").remove();
+      var startIndex = (try_numbers > 2 ? 0 : 1)
+      for (var index = startIndex; index <  try_numbers; index++) {
+        var url = "{{ url_for('Airflow.get_logs_with_metadata') }}" +
+          "?dag_id=" + encodeURIComponent(dag_id) +
+          "&task_id=" + encodeURIComponent(task_id) +
+          "&execution_date=" + encodeURIComponent(execution_date) +
+          "&metadata=null" +
+          "&format=file";
+
+        var showLabel = index;
+        if (index != 0) {
+          url += "&try_number=" + index;
+        } else {
+          showLabel = 'All';
         }
+
+        $("#try_index").append(`<li role="presentation" style="display:inline">
+          <a href="${url}"> ${showLabel} </a>
+          </li>`
+        );
+      }
     }
 
     function call_modal_dag(dag) {
diff --git a/airflow/www_rbac/templates/airflow/tree.html b/airflow/www_rbac/templates/airflow/tree.html
index da886317ed..0fe09c1b56 100644
--- a/airflow/www_rbac/templates/airflow/tree.html
+++ b/airflow/www_rbac/templates/airflow/tree.html
@@ -234,9 +234,9 @@
         if(d.task_id === undefined)
             call_modal_dag(d);
         else if(nodeobj[d.task_id].operator=='SubDagOperator')
-            call_modal(d.task_id, d.execution_date, true);
+            call_modal(d.task_id, d.execution_date, d.try_number, true);
         else
-            call_modal(d.task_id, d.execution_date);
+            call_modal(d.task_id, d.execution_date, d.try_number);
       })
       .attr("class", function(d) {return "state " + d.state})
       .attr("data-toggle", "tooltip")
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 3177a5540b..e91703ee4b 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -36,7 +36,7 @@
 import sqlalchemy as sqla
 from flask import (
     redirect, request, Markup, Response, render_template,
-    make_response, flash, jsonify)
+    make_response, flash, jsonify, send_file)
 from flask._compat import PY2
 from flask_appbuilder import BaseView, ModelView, expose, has_access
 from flask_appbuilder.actions import action
@@ -60,7 +60,7 @@
 from airflow.utils import timezone
 from airflow.utils.dates import infer_time_unit, scale_time_units
 from airflow.utils.db import provide_session
-from airflow.utils.helpers import alchemy_to_dict
+from airflow.utils.helpers import alchemy_to_dict, render_log_filename
 from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.www_rbac import utils as wwwutils
@@ -70,6 +70,10 @@
                                     DateTimeWithNumRunsWithDagRunsForm,
                                     DagRunForm, ConnectionForm)
 from airflow.www_rbac.widgets import AirflowModelListWidget
+if PY2:
+    from cStringIO import StringIO
+else:
+    from io import StringIO
 
 
 PAGE_SIZE = conf.getint('webserver', 'page_size')
@@ -509,9 +513,13 @@ def get_logs_with_metadata(self, session=None):
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
         dttm = pendulum.parse(execution_date)
-        try_number = int(request.args.get('try_number'))
+        if request.args.get('try_number') is not None:
+            try_number = int(request.args.get('try_number'))
+        else:
+            try_number = None
         metadata = request.args.get('metadata')
         metadata = json.loads(metadata)
+        response_format = request.args.get('format', 'json')
 
         # metadata may be null
         if not metadata:
@@ -551,8 +559,16 @@ def get_logs_with_metadata(self, session=None):
             for i, log in enumerate(logs):
                 if PY2 and not isinstance(log, unicode):
                     logs[i] = log.decode('utf-8')
-            message = logs[0]
-            return jsonify(message=message, metadata=metadata)
+
+            if response_format == 'json':
+                message = logs[0] if try_number is not None else logs
+                return jsonify(message=message, metadata=metadata)
+
+            file_obj = StringIO('\n'.join(logs))
+            filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE')
+            attachment_filename = render_log_filename(ti, try_number, filename_template)
+            return send_file(file_obj, as_attachment=True,
+                             attachment_filename=attachment_filename)
         except AttributeError as e:
             error_message = ["Task log handler {} does not support read logs.\n{}\n"
                              .format(task_log_reader, str(e))]
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 837a79acba..ec7fc58d3a 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -29,8 +29,9 @@
 import six
 
 from airflow import DAG
-from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils import helpers
+from airflow.models import TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
 
 
 class TestHelpers(unittest.TestCase):
@@ -62,6 +63,28 @@ def signal_handler(signum, frame):
         while True:
             time.sleep(1)
 
+    def test_render_log_filename(self):
+        try_number = 1
+        dag_id = 'test_render_log_filename_dag'
+        task_id = 'test_render_log_filename_task'
+        execution_date = datetime(2016, 1, 1)
+
+        dag = DAG(dag_id, start_date=execution_date)
+        task = DummyOperator(task_id=task_id, dag=dag)
+        ti = TaskInstance(task=task, execution_date=execution_date)
+
+        filename_template = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log"
+
+        ts = ti.get_template_context()['ts']
+        expected_filename = "{dag_id}/{task_id}/{ts}/{try_number}.log".format(dag_id=dag_id,
+                                                                              task_id=task_id,
+                                                                              ts=ts,
+                                                                              try_number=try_number)
+
+        rendered_filename = helpers.render_log_filename(ti, try_number, filename_template)
+
+        self.assertEqual(rendered_filename, expected_filename)
+
     def test_reap_process_group(self):
         """
         Spin up a process that can't be killed by SIGTERM and make sure
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 5446086d18..afe59a61fe 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -384,6 +384,28 @@ def test_get_file_task_log(self):
         self.assertIn('Log by attempts',
                       response.data.decode('utf-8'))
 
+    def test_get_logs_with_metadata_as_download_file(self):
+        url_template = "/admin/airflow/get_logs_with_metadata?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}&metadata={}&format=file"
+        try_number = 1
+        url = url_template.format(self.DAG_ID,
+                                  self.TASK_ID,
+                                  quote_plus(self.DEFAULT_DATE.isoformat()),
+                                  try_number,
+                                  json.dumps({}))
+        response = self.app.get(url)
+        expected_filename = '{}/{}/{}/{}.log'.format(self.DAG_ID,
+                                                     self.TASK_ID,
+                                                     self.DEFAULT_DATE.isoformat(),
+                                                     try_number)
+
+        content_disposition = response.headers.get('Content-Disposition')
+        self.assertTrue(content_disposition.startswith('attachment'))
+        self.assertTrue(content_disposition.endswith(expected_filename))
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+
     def test_get_logs_with_metadata(self):
         url_template = "/admin/airflow/get_logs_with_metadata?dag_id={}&" \
                        "task_id={}&execution_date={}&" \
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 8e5eed69d2..01863f5777 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -556,6 +556,28 @@ def test_get_file_task_log(self):
         self.assertIn('Log by attempts',
                       response.data.decode('utf-8'))
 
+    def test_get_logs_with_metadata_as_download_file(self):
+        url_template = "get_logs_with_metadata?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}&metadata={}&format=file"
+        try_number = 1
+        url = url_template.format(self.DAG_ID,
+                                  self.TASK_ID,
+                                  quote_plus(self.DEFAULT_DATE.isoformat()),
+                                  try_number,
+                                  json.dumps({}))
+        response = self.app.get(url)
+        expected_filename = '{}/{}/{}/{}.log'.format(self.DAG_ID,
+                                                     self.TASK_ID,
+                                                     self.DEFAULT_DATE.isoformat(),
+                                                     try_number)
+
+        content_disposition = response.headers.get('Content-Disposition')
+        self.assertTrue(content_disposition.startswith('attachment'))
+        self.assertTrue(content_disposition.endswith(expected_filename))
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+
     def test_get_logs_with_metadata(self):
         url_template = "get_logs_with_metadata?dag_id={}&" \
                        "task_id={}&execution_date={}&" \


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services