You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/17 23:50:59 UTC

[GitHub] stale[bot] closed pull request #2515: [AIRFLOW-1325][WIP] Airflow streaming log backed by ElasticSearch

stale[bot] closed pull request #2515: [AIRFLOW-1325][WIP] Airflow streaming log backed by ElasticSearch
URL: https://github.com/apache/incubator-airflow/pull/2515
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py
index d6ae0366d1..3663b35aea 100644
--- a/airflow/config_templates/default_airflow_logging.py
+++ b/airflow/config_templates/default_airflow_logging.py
@@ -73,6 +73,13 @@
             'gcs_log_folder': GCS_LOG_FOLDER,
             'filename_template': FILENAME_TEMPLATE,
         },
+        'es.task': {
+            'class': 'airflow.utils.log.elasticsearch_task_handler.ElasticsearchTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filename_template': FILENAME_TEMPLATE,
+            'host': 'localhost:9200',
+        },
     },
     'loggers': {
         'airflow.task': {
diff --git a/airflow/utils/log/elasticsearch_task_handler.py b/airflow/utils/log/elasticsearch_task_handler.py
new file mode 100644
index 0000000000..df18ba4dd2
--- /dev/null
+++ b/airflow/utils/log/elasticsearch_task_handler.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import AirflowException
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from elasticsearch import Elasticsearch, ElasticsearchException, helpers
+from elasticsearch_dsl import Search
+
+
+class ElasticsearchTaskHandler(FileTaskHandler):
+    """
+    ElasticsearchTaskHandler is a python log handler that
+    reads logs from Elasticsearch. Note logs are not directly
+    indexed into Elasticsearch. Instead, it flushes logs
+    into local files. Additional software setup is required
+    to index the log into Elasticsearch, such as using
+    Filebeat and Logstash.
+
+    To efficiently query and sort Elasticsearch results, we assume each
+    log message has a field `log_id` consists of ti primary keys:
+    `log_id = {dag_id}-{task_id}-{execution_date}-{try_number}`
+    Log messages with specific log_id are sorted based on `offset`,
+    which is a unique integer indicates log message's order.
+    Timestamp here are unreliable because multiple log messages
+    might have the same timestamp.
+    """
+    def __init__(self, base_log_folder, filename_template,
+                 host='localhost:9200'):
+        """
+        :param base_log_folder: base folder to store logs locally
+        :param filename_template: log filename template
+        :param host: Elasticsearh host name
+        """
+        super(ElasticsearchTaskHandler, self).__init__(
+            base_log_folder, filename_template)
+        self.client = Elasticsearch([host])
+
+    def streaming_read(self, dag_id, task_id, execution_date,
+                  try_number, offset=None, page=0, max_line_per_page=1000):
+        """
+        Endpoint for streaming log.
+        :param dag_id: id of the dag
+        :param task_id: id of the task
+        :param execution_date: execution date in isoformat
+        :param try_number: try_number of the task instance
+        :param offset: filter log with offset strictly greater than offset
+        :param page: logs at given page
+        :param max_line_per_page: maximum number of results returned per ES query
+        :return a list of log documents
+        """
+        log_id = '-'.join([dag_id, task_id, execution_date, try_number])
+
+        s = Search(using=self.client) \
+            .query('match', log_id=log_id) \
+            .sort('offset')
+
+        # Offset is the unique key for sorting logs given log_id.
+        if offset:
+            s = s.filter('range', offset={'gt': offset})
+
+        try:
+            response = s[max_line_per_page * page:max_line_per_page].execute()
+            logs = [hit for hit in response]
+
+        except ElasticsearchException as e:
+            # Do not swallow the ES error.
+            err = "Unable to read logs from ElasticSearch: {}\n".format(str(e))
+            raise AirflowException(err)
+
+        return logs
+
+    def _read(self, ti, try_number):
+        """
+        Read all logs of given task instance and try_number from Elaticsearch.
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        :return return log messages in string format
+        """
+        dag_id = ti.dag_id
+        task_id = ti.task_id
+        execution_date = ti.execution_date.isoformat()
+
+        log_id = '-'.join([dag_id, task_id, execution_date, try_number])
+
+        # Use ES Scroll API to get all logs, since query DSL can at most
+        # return 10k results. This might take some time as sorting scroll
+        # results is very expensive. Please use streaming endpoint to boost
+        # performance.
+        s = Search(using=self.client) \
+            .query('match', log_id=log_id) \
+            .sort('offset')
+        query = s.to_dict()
+        try:
+            response = helpers.scan(self.client, query=query, preserve_order=True)
+            log = '\n'.join([hit['_source']['message'] for hit in response])
+        except ElasticsearchException as e:
+            err = "Unable to read logs from ElasticSearch: {}\n".format(str(e))
+            raise AirflowException(err)
+
+        return log
diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html
index 03c0ed3707..f92b756880 100644
--- a/airflow/www/templates/airflow/ti_log.html
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -33,7 +33,7 @@ <h4>{{ title }}</h4>
   <div class="tab-content">
     {% for log in logs %}
       <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}">
-        <pre id="attempt-{{ loop.index }}">{{ log }}</pre>
+        <pre id="try-{{ loop.index }}">{{ log }}</pre>
       </div>
     {% endfor %}
   </div>
diff --git a/airflow/www/templates/airflow/ti_streaming_log.html b/airflow/www/templates/airflow/ti_streaming_log.html
new file mode 100644
index 0000000000..cdddab40fd
--- /dev/null
+++ b/airflow/www/templates/airflow/ti_streaming_log.html
@@ -0,0 +1,86 @@
+{#
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+#}
+{% extends "airflow/ti_log.html" %}
+
+{% block tail %}
+  {{ lib.form_js() }}
+  {{ super() }}
+  <script>
+    // TODO: make those constants configurable.
+    // Time interval to wait before next log fetching. Default 2s.
+    const DELAY = 2e3;
+    // Distance away from page bottom to enable auto tailing.
+    const AUTO_TAILING_OFFSET = 30;
+    // Animation speed for auto tailing log display.
+    const ANIMATION_SPEED = 1000;
+    // Total number of tabs to show.
+    const TOTAL_ATTEMPTS = "{{ logs|length }}";
+    // Recursively fetch logs from flask endpoint.
+    function recurse(delay=DELAY) {
+      return new Promise((resolve) => setTimeout(resolve, delay));
+    }
+    // Enable auto tailing only when users scroll down to the bottom
+    // of the page. This prevent auto tailing the page if users want
+    // to view earler rendered messages.
+    function checkAutoTailingCondition() {
+      const docHeight = $(document).height();
+      return $(window).scrollTop() + $(window).height() > docHeight - AUTO_TAILING_OFFSET;
+    }
+    // Streaming log with auto-tailing.
+    function autoTailingLog(try_number, offset=null, auto_tailing=false) {
+      return Promise.resolve(
+        $.ajax({
+          url: "{{ url_for("airflow.get_log_js") }}",
+          data: {
+            dag_id: "{{ dag_id }}",
+            task_id: "{{ task_id }}",
+            execution_date: "{{ execution_date }}",
+            try_number: try_number,
+            offset: offset,
+          },
+        })).then(res => {
+          if (res && res.message) {
+            $(`#try-${try_number}`).append(res.message + '\n');
+            // Auto scroll window to the end if current window location is near the end
+            if(auto_tailing && checkAutoTailingCondition()) {
+              $("html, body").animate({ scrollTop: $(document).height() }, ANIMATION_SPEED);
+            }
+          }
+          // Stop recursive call to backend when error occurs.
+          if (res && res.error) {
+            return;
+          }
+          return recurse().then(() => autoTailingLog(
+            try_number, res.next_offset, auto_tailing));
+        });
+    }
+    $(document).ready(function() {
+      // Lazily load all past task instance logs.
+      // TODO: We only need to have recursive queries for
+      // latest running task instances. Currently it does not
+      // work well with ElasticSearch becasue ES query only
+      // returns at most 10k documents. We want the ability
+      // to display all logs in the front-end.
+      // An optimation here is to render from latest attempt.
+      for(let i = TOTAL_ATTEMPTS; i >= 1; i--) {
+        // Only auto_tailing the page when streaming the latest attempt.
+        autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS));
+      }
+    });
+  </script>
+{% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 3bfad5dd61..5709bffe80 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -697,6 +697,31 @@ def rendered(self):
             form=form,
             title=title, )
 
+    @expose('/get_log_js')
+    @login_required
+    @wwwutils.action_logging
+    def get_log_js(self):
+        """ JavaScript endpoint for reading streaming logs. """
+        # TODO: move this logic to API.
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        try_number = request.args.get('try_number')
+        offset = request.args.get('offset')
+
+        logger = logging.getLogger('airflow.task')
+        task_log_reader = conf.get('core', 'task_log_reader')
+        handler = next((handler for handler in logger.handlers
+                        if handler.name == task_log_reader), None)
+        try:
+            logs = handler.streaming_read(dag_id, task_id, execution_date,
+                                          try_number, offset=offset)
+            next_offset = offset if not logs else logs[-1].offset
+            message = '\n'.join([log.message for log in logs])
+            return jsonify(message=message, next_offset=next_offset)
+        except (AttributeError, AirflowException) as e:
+            return jsonify(message=str(e), error=True)
+
     @expose('/log')
     @login_required
     @wwwutils.action_logging
@@ -724,6 +749,8 @@ def log(self):
             except AttributeError as e:
                 logs = ["Task log handler {} does not support read logs.\n{}\n" \
                             .format(task_log_reader, e.message)]
+            except AirflowException as e:
+                logs = ["Unable to read logs: {}".format(str(e))]
 
         for i, log in enumerate(logs):
             if PY2 and not isinstance(log, unicode):
diff --git a/setup.py b/setup.py
index dedcf76794..f5c56ae17d 100644
--- a/setup.py
+++ b/setup.py
@@ -136,6 +136,10 @@ def check_previous():
 ]
 docker = ['docker-py>=1.6.0']
 druid = ['pydruid>=0.2.1']
+elasticsearch = [
+    'elasticsearch>=5.0.0,<6.0.0',
+    'elasticsearch-dsl>=5.0.0,<6.0.0'
+]
 emr = ['boto3>=1.0.0']
 gcp_api = [
     'httplib2',
@@ -267,6 +271,7 @@ def do_setup():
             'doc': doc,
             'docker': docker,
             'druid': druid,
+            'elasticsearch': elasticsearch,
             'emr': emr,
             'gcp_api': gcp_api,
             'github_enterprise': github_enterprise,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services