You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/10 20:24:38 UTC

[GitHub] rhwang10 closed pull request #4303: [AIRFLOW-3370] Elasticsearch log task handler additional features

rhwang10 closed pull request #4303: [AIRFLOW-3370] Elasticsearch log task handler additional features
URL: https://github.com/apache/incubator-airflow/pull/4303
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 45a2f2923c..2bf9f9d0e8 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -59,6 +59,13 @@
 
 END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK')
 
+ELASTICSEARCH_WRITE_STDOUT = conf.get('elasticsearch', 'ELASTICSEARCH_WRITE_STDOUT')
+
+ELASTICSEARCH_JSON_FORMAT = conf.get('elasticsearch', 'ELASTICSEARCH_JSON_FORMAT')
+
+ELASTICSEARCH_RECORD_LABELS = [label.strip() for label in conf.get('elasticsearch',
+                               'ELASTICSEARCH_RECORD_LABELS').split(",")]
+
 DEFAULT_LOGGING_CONFIG = {
     'version': 1,
     'disable_existing_loggers': False,
@@ -191,6 +198,9 @@
             'filename_template': FILENAME_TEMPLATE,
             'end_of_log_mark': END_OF_LOG_MARK,
             'host': ELASTICSEARCH_HOST,
+            'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
+            'json_format': ELASTICSEARCH_JSON_FORMAT,
+            'record_labels': ELASTICSEARCH_RECORD_LABELS,
         },
     },
 }
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a9473178c1..146192051d 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -588,6 +588,9 @@ hide_sensitive_variable_fields = True
 elasticsearch_host =
 elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
 elasticsearch_end_of_log_mark = end_of_log
+elasticsearch_write_stdout=
+elasticsearch_json_format=
+elasticsearch_record_labels=
 
 [kubernetes]
 # The repository, tag and imagePullPolicy of the Kubernetes Image for the Worker to Run
diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py
index 16372c0600..e396504969 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -17,8 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
+# Using `from elasticsearch import *` breaks es mocking in unit test.
 import elasticsearch
+import json
+import logging
+import sys
+
 import pendulum
 from elasticsearch_dsl import Search
 
@@ -28,6 +32,43 @@
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
+class ParentStdout():
+    """
+    Keep track of the ParentStdout stdout context in child process
+    """
+    def __init__(self):
+        self.closed = False
+
+    def write(self, string):
+        sys.__stdout__.write(string)
+
+    def close(self):
+        self.closed = True
+
+
+class JsonFormatter(logging.Formatter):
+    """
+    Custom formatter to allow for fields to be captured in JSON format.
+    Fields are added via the RECORD_LABELS list.
+    TODO: Move RECORD_LABELS into configs/log_config.py
+    """
+    def __init__(self, record_labels, processedTask=None):
+        super(JsonFormatter, self).__init__()
+        self.processedTask = processedTask
+        self.record_labels = record_labels
+
+    def _mergeDictionaries(self, dict_1, dict_2):
+        merged = dict_1.copy()
+        merged.update(dict_2)
+        return merged
+
+    def format(self, record):
+        recordObj = {label: getattr(record, label)
+                     for label in self.record_labels}
+        log_context = self._mergeDictionaries(recordObj, self.processedTask)
+        return json.dumps(log_context)
+
+
 class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
     PAGE = 0
     MAX_LINE_PER_PAGE = 1000
@@ -50,7 +91,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
 
     def __init__(self, base_log_folder, filename_template,
                  log_id_template, end_of_log_mark,
-                 host='localhost:9200'):
+                 write_stdout=None, json_format=None,
+                 record_labels=None, host='localhost:9200'):
         """
         :param base_log_folder: base folder to store logs locally
         :param log_id_template: log id template
@@ -58,7 +100,15 @@ def __init__(self, base_log_folder, filename_template,
         """
         super(ElasticsearchTaskHandler, self).__init__(
             base_log_folder, filename_template)
+
         self.closed = False
+        self.write_stdout = write_stdout
+        self.json_format = json_format
+        self.record_labels = record_labels
+
+        self.handler = None
+        self.taskInstance = None
+        self.writer = None
 
         self.log_id_template, self.log_id_jinja_template = \
             parse_template_string(log_id_template)
@@ -68,12 +118,115 @@ def __init__(self, base_log_folder, filename_template,
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark
 
+    def set_context(self, ti):
+        if self.write_stdout:
+            self.writer = ParentStdout()
+            sys.stdout = self.writer
+
+            self.taskInstance = self._process_task_instance(ti)
+
+            self.handler = logging.StreamHandler(stream=sys.stdout)
+
+            if self.json_format:
+                self.handler.setFormatter(JsonFormatter(self.record_labels,
+                                                        self.taskInstance))
+            elif not self.json_format:
+                self.handler.setFormatter(self.formatter)
+            self.handler.setLevel(self.level)
+
+        elif not self.write_stdout:
+            super(ElasticsearchTaskHandler, self).set_context(ti)
+        self.mark_end_on_close = not ti.raw
+
+    def emit(self, record):
+        if self.write_stdout:
+            self.formatter.format(record)
+            if self.handler is not None:
+                self.handler.emit(record)
+        elif not self.write_stdout:
+            super(ElasticsearchTaskHandler).emit(record)
+
+    def flush(self):
+        if self.handler is not None:
+            self.handler.flush()
+
+    def _process_task_instance(self, ti):
+        """
+        Process task instance information to create a log_id
+        key for Elasticsearch
+        """
+        ti_info = {'dag_id': str(ti.dag_id),
+                   'task_id': str(ti.task_id),
+                   'execution_date': str(ti.execution_date),
+                   'try_number': str(ti.try_number)}
+        return ti_info
+
+    def read(self, task_instance, try_number=None, metadata=None):
+            """
+            Read logs of a given task instance from elasticsearch.
+            :param task_instance: task instance object
+            :param try_number: task instance try_number to read logs from.
+            If None,it will start at 1.
+            """
+            if self.write_stdout:
+                if try_number is None:
+                    next_try = task_instance.next_try_number
+                    try_numbers = list(range(1, next_try))
+                elif try_number < 1:
+                    logs = [
+                        'Error fetching the logs. \
+                         Try number {} is invalid.'.format(try_number),
+                    ]
+                    return logs
+                else:
+                    try_numbers = [try_number]
+
+                logs = [''] * len(try_numbers)
+                metadatas = [{}] * len(try_numbers)
+                for i, try_number in enumerate(try_numbers):
+                    log, metadata = self._read(task_instance,
+                                               try_number,
+                                               metadata)
+
+                    # If there's a log present, then we don't want to keep
+                    # checking. Set end_of_log to True, set the
+                    # mark_end_on_close to False and return the log and
+                    # metadata. This will prevent the recursion from happening
+                    # in the ti_log.html script and will therefore prevent
+                    # constantly checking ES for updates, since we've
+                    # fetched what we're looking for
+                    if log:
+                        logs[i] += log
+                        metadata['end_of_log'] = True
+                        self.mark_end_on_close = False
+                        metadatas[i] = metadata
+                    elif not log:
+                        metadata['end_of_log'] = False
+                        metadatas[i] = metadata
+
+                return logs, metadatas
+            elif not self.write_stdout:
+                return super(ElasticsearchTaskHandler, self) \
+                    .read(task_instance, try_number, metadata)
+
     def _render_log_id(self, ti, try_number):
+        # Using Jinja2 templating
         if self.log_id_jinja_template:
             jinja_context = ti.get_template_context()
             jinja_context['try_number'] = try_number
             return self.log_id_jinja_template.render(**jinja_context)
 
+        # Make log_id ES Query friendly if using standard out option
+        if self.write_stdout:
+            return self.log_id_template.format(dag_id=ti.dag_id,
+                                               task_id=ti.task_id,
+                                               execution_date=ti
+                                               .execution_date.isoformat(),
+                                               try_number=try_number) \
+                                       .replace(":", "_") \
+                                       .replace("-", "_") \
+                                       .replace("+", "_")
+
         return self.log_id_template.format(dag_id=ti.dag_id,
                                            task_id=ti.task_id,
                                            execution_date=ti
@@ -132,7 +285,6 @@ def es_read(self, log_id, offset):
         :param offset: the offset start to read log from.
         :type offset: str
         """
-
         # Offset is the unique key for sorting logs given log_id.
         s = Search(using=self.client) \
             .query('match', log_id=log_id) \
@@ -143,9 +295,8 @@ def es_read(self, log_id, offset):
         logs = []
         if s.count() != 0:
             try:
-
-                logs = s[self.MAX_LINE_PER_PAGE * self.PAGE:self.MAX_LINE_PER_PAGE] \
-                    .execute()
+                logs = s[self.MAX_LINE_PER_PAGE *
+                         self.PAGE:self.MAX_LINE_PER_PAGE].execute()
             except Exception as e:
                 msg = 'Could not read log with log_id: {}, ' \
                       'error: {}'.format(log_id, str(e))
@@ -153,10 +304,6 @@ def es_read(self, log_id, offset):
 
         return logs
 
-    def set_context(self, ti):
-        super(ElasticsearchTaskHandler, self).set_context(ti)
-        self.mark_end_on_close = not ti.raw
-
     def close(self):
         # When application exit, system shuts down all handlers by
         # calling close method. Here we check if logger is already
@@ -175,14 +322,24 @@ def close(self):
             return
 
         # Reopen the file stream, because FileHandler.close() would be called
-        # first in logging.shutdown() and the stream in it would be set to None.
-        if self.handler.stream is None or self.handler.stream.closed:
+        # first in logging.shutdown() and the stream in it would be set to None
+        if not self.write_stdout and (self.handler.stream is None or
+                                      self.handler.stream.closed):
             self.handler.stream = self.handler._open()
 
         # Mark the end of file using end of log mark,
         # so we know where to stop while auto-tailing.
-        self.handler.stream.write(self.end_of_log_mark)
-
-        super(ElasticsearchTaskHandler, self).close()
+        # Don't need to do this if using write_stdout, this is handled in read
+        if not self.write_stdout:
+            self.handler.stream.write(self.end_of_log_mark)
+
+        if self.write_stdout:
+            if self.handler is not None:
+                self.writer.close()
+                self.handler.close()
+                sys.stdout = sys.__stdout__
+
+        elif not self.write_stdout:
+            super(ElasticsearchTaskHandler, self).close()
 
         self.closed = True
diff --git a/docs/howto/write-logs.rst b/docs/howto/write-logs.rst
index 7305ba82d6..4c2bb4a6c7 100644
--- a/docs/howto/write-logs.rst
+++ b/docs/howto/write-logs.rst
@@ -135,3 +135,103 @@ example:
         [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
 
 Note the top line that says it's reading from the remote log file.
+
+Writing Logs to Elasticsearch
+-----------------------------
+
+Airflow can be configured to read task logs to Elasticsearch.
+
+This handler also has additional options.
+
+1. If you are using the Celery Executor, you can choose to have all task logs from workers output to the highest parent level process, instead of the child process.
+   This allows for many different applications to collect task log information from standard out, instead of forcing the logs to be written to a persistent storage.
+   To use this feature, set the ``ELASTICSEARCH_WRITE_STDOUT`` flag in ``airflow.cfg``.
+
+2. You can also choose to have the logs output in a JSON format. Airflow uses the standard Python ``logging`` module. JSON fields are directly defined from the LogRecord object.
+   To use this feature, set the ``ELASTICSEARCH_JSON_FORMAT`` flag in ``airflow.cfg``. Add the fields to the comma-delimited string that you want collected for the logs. These fields are
+   from the LogRecord object in the ``logging`` module. `Documentation on different attributes can be found here <https://docs.python.org/3/library/logging.html#logrecord-objects/>`_.
+
+First, to use the handler, ``airflow.cfg`` must be configured as follows:
+
+  .. code-block:: bash
+
+    [core]
+    # Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
+    # Users must supply an Airflow connection id that provides access to the storage
+    # location. If remote_logging is set to true, see UPDATING.md for additional
+    # configuration requirements.
+    remote_logging = True
+    logging_config_class = airflow.path.to.config.LOGGING_CONFIG
+
+    [elasticsearch]
+    elasticsearch_host = {{ host }}:{{ port }}
+    elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
+    elasticsearch_end_of_log_mark = end_of_log
+    elasticsearch_write_stdout =
+    elasticsearch_json_format =
+
+If the ``ELASTICSEARCH_WRITE_STDOUT`` flag is desired, the following must be added:
+
+  .. code-block:: bash
+
+    [core]
+    # Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
+    # Users must supply an Airflow connection id that provides access to the storage
+    # location. If remote_logging is set to true, see UPDATING.md for additional
+    # configuration requirements.
+    remote_logging = True
+    task_log_reader = elasticsearch
+    logging_config_class = airflow.path.to.config.LOGGING_CONFIG
+
+    [elasticsearch]
+    elasticsearch_host = {{ host }}:{{ port }}
+    elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
+    elasticsearch_end_of_log_mark = end_of_log
+    elasticsearch_write_stdout = True
+    elasticsearch_json_format = True
+    # A comma delimited string for all record label fields
+    elasticsearch_record_labels = asctime, loglevel, ...
+
+To enable the Elasticsearch handler, a custom log configuration file must be added, as detailed by the following steps:
+
+1. Airflow's logging system requires a custom .py file to be located in the ``PYTHONPATH``, so that it's importable from Airflow. Start by creating a directory to store the config file. ``$AIRFLOW_HOME/config`` is recommended.
+2. Create empty files called ``$AIRFLOW_HOME/config/log_config.py`` and ``$AIRFLOW_HOME/config/__init__.py``.
+3. Copy the contents of ``airflow/config_templates/airflow_local_settings.py`` into the ``log_config.py`` file that was just created in the step above.
+4. Customize the following portions of the template:
+
+  .. code-block:: bash
+
+      # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
+      LOGGING_CONFIG = ...
+
+      # Rename DEFAULT_DAG_PARSING_LOGGING_CONFIG to DAG_PARSING_LOGGING_CONFIG
+      DAG_PARSING_LOGGING_CONFIG = ...
+
+If the ``ELASTICSEARCH_WRITE_STDOUT`` flag is set in ``airflow.cfg``, add the following to your custom log configuration file.
+
+  .. code-block:: bash
+
+      # Change the section in `loggers['airflow.task']` to include elasticsearch
+      'loggers': {
+        'airflow.task': {
+            'handlers': ["elasticsearch"],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        }
+      }
+
+      # Copy elasticsearch into the `handlers` section
+      'handlers': {
+        'elasticsearch': {
+            'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'log_id_template': LOG_ID_TEMPLATE,
+            'filename_template': FILENAME_TEMPLATE,
+            'end_of_log_mark': END_OF_LOG_MARK,
+            'host': ELASTICSEARCH_HOST,
+            'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
+            'json_format': ELASTICSEARCH_JSON_FORMAT,
+            'record_labels': ELASTICSEARCH_RECORD_LABELS,
+        }
+      }
diff --git a/tests/utils/log/test_es_task_handler.py b/tests/utils/log/test_es_task_handler.py
index 94184fc826..f0352c906b 100644
--- a/tests/utils/log/test_es_task_handler.py
+++ b/tests/utils/log/test_es_task_handler.py
@@ -41,6 +41,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
     EXECUTION_DATE = datetime(2016, 1, 1)
     LOG_ID = 'dag_for_testing_file_task_handler-task_for_testing' \
              '_file_log_handler-2016-01-01T00:00:00+00:00-1'
+    LOG_ID_JSON = 'dag_for_testing_file_task_handler_task_for_testing' \
+                  '_file_log_handler_2016_01_01T00_00_00_00_00_1'
 
     @elasticmock
     def setUp(self):
@@ -49,11 +51,18 @@ def setUp(self):
         self.filename_template = '{try_number}.log'
         self.log_id_template = '{dag_id}-{task_id}-{execution_date}-{try_number}'
         self.end_of_log_mark = 'end_of_log\n'
+        self.write_stdout = None
+        self.json_format = None
+        self.record_labels = None
+
         self.es_task_handler = ElasticsearchTaskHandler(
             self.local_log_location,
             self.filename_template,
             self.log_id_template,
-            self.end_of_log_mark
+            self.end_of_log_mark,
+            self.write_stdout,
+            self.json_format,
+            self.record_labels,
         )
 
         self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 'port': 9200}])
@@ -94,7 +103,25 @@ def test_read(self):
         self.assertEqual(1, metadatas[0]['offset'])
         self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts)
 
-    def test_read_with_none_meatadata(self):
+    def test_read_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        ts = pendulum.now()
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        # Testing for preventing the autotailing condition here
+        self.assertTrue(metadatas[0]['end_of_log'])
+        self.assertEqual(1, metadatas[0]['offset'])
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts)
+
+    def test_read_with_none_metadata(self):
         logs, metadatas = self.es_task_handler.read(self.ti, 1)
         self.assertEqual(1, len(logs))
         self.assertEqual(len(logs), len(metadatas))
@@ -104,6 +131,20 @@ def test_read_with_none_meatadata(self):
         self.assertTrue(
             timezone.parse(metadatas[0]['last_log_timestamp']) < pendulum.now())
 
+    def test_read_with_none_metadata_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        logs, metadatas = self.es_task_handler.read(self.ti, 1)
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        # Handle the stopping condition for auto-tailing here
+        self.assertTrue(metadatas[0]['end_of_log'])
+        self.assertEqual(1, metadatas[0]['offset'])
+        self.assertTrue(
+            timezone.parse(metadatas[0]['last_log_timestamp']) < pendulum.now())
+
     def test_read_nonexistent_log(self):
         ts = pendulum.now()
         # In ElasticMock, search is going to return all documents with matching index
@@ -123,6 +164,25 @@ def test_read_nonexistent_log(self):
         # last_log_timestamp won't change if no log lines read.
         self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts)
 
+    def test_read_nonexistent_log_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        ts = pendulum.now()
+        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(0, metadatas[0]['offset'])
+        # last_log_timestamp won't change if no log lines read.
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts)
+
     def test_read_with_empty_metadata(self):
         ts = pendulum.now()
         logs, metadatas = self.es_task_handler.read(self.ti, 1, {})
@@ -149,6 +209,24 @@ def test_read_with_empty_metadata(self):
         # if not last_log_timestamp is provided.
         self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts)
 
+    def test_read_with_empty_metadata_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        ts = pendulum.now()
+        logs, metadatas = self.es_task_handler.read(self.ti, 1, {})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        # Assertion should be True if a log message is present, as it prevents
+        # collecting more messages than necessary (just 1)
+        self.assertTrue(metadatas[0]['end_of_log'])
+        # offset should be initialized to 0 if not provided
+        self.assertEqual(1, metadatas[0]['offset'])
+        # last_log_timestamp will be initalized using log reading time
+        # if not last_log_timestamp is provided
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts)
+
     def test_read_timeout(self):
         ts = pendulum.now().subtract(minutes=5)
 
@@ -166,6 +244,27 @@ def test_read_timeout(self):
         self.assertEqual(0, metadatas[0]['offset'])
         self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts)
 
+    def test_read_timeout_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        ts = pendulum.now().subtract(minutes=5)
+
+        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        # Assertion should be false if a log message is not present, to keep querying ES
+        self.assertFalse(metadatas[0]['end_of_log'])
+        # offset should be initialized to 0 if not provided.
+        self.assertEqual(0, metadatas[0]['offset'])
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts)
+
     def test_read_raises(self):
         with mock.patch.object(self.es_task_handler.log, 'exception') as mock_exception:
             with mock.patch("elasticsearch_dsl.Search.execute") as mock_execute:
@@ -182,10 +281,35 @@ def test_read_raises(self):
         self.assertFalse(metadatas[0]['end_of_log'])
         self.assertEqual(0, metadatas[0]['offset'])
 
+    def test_read_raises_stdout_logs_json_true(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+        with mock.patch.object(self.es_task_handler.log, 'exception') as mock_exception:
+            with mock.patch("elasticsearch_dsl.Search.execute") as mock_execute:
+                mock_execute.side_effect = Exception('Failed to read')
+                logs, metadatas = self.es_task_handler.read(self.ti, 1)
+            msg = "Could not read log with log_id: {}".format(self.LOG_ID_JSON)
+            mock_exception.assert_called_once()
+            args, kwargs = mock_exception.call_args
+            self.assertIn(msg, args[0])
+
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(0, metadatas[0]['offset'])
+
     def test_set_context(self):
         self.es_task_handler.set_context(self.ti)
         self.assertTrue(self.es_task_handler.mark_end_on_close)
 
+    def test_set_context_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        self.es_task_handler.set_context(self.ti)
+        self.assertTrue(self.es_task_handler.mark_end_on_close)
+
     def test_close(self):
         self.es_task_handler.set_context(self.ti)
         self.es_task_handler.close()
@@ -195,6 +319,14 @@ def test_close(self):
             self.assertIn(self.end_of_log_mark, log_file.read())
         self.assertTrue(self.es_task_handler.closed)
 
+    def test_close_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.close()
+        self.assertTrue(self.es_task_handler.closed)
+
     def test_close_no_mark_end(self):
         self.ti.raw = True
         self.es_task_handler.set_context(self.ti)
@@ -205,6 +337,15 @@ def test_close_no_mark_end(self):
             self.assertNotIn(self.end_of_log_mark, log_file.read())
         self.assertTrue(self.es_task_handler.closed)
 
+    def test_close_no_mark_end_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        self.ti.raw = True
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.close()
+        self.assertTrue(self.es_task_handler.closed)
+
     def test_close_closed(self):
         self.es_task_handler.closed = True
         self.es_task_handler.set_context(self.ti)
@@ -214,6 +355,16 @@ def test_close_closed(self):
                   'r') as log_file:
             self.assertEqual(0, len(log_file.read()))
 
+    def test_close_closed_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        self.es_task_handler.closed = True
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.close()
+
+        self.assertTrue(self.es_task_handler.closed)
+
     def test_close_with_no_handler(self):
         self.es_task_handler.set_context(self.ti)
         self.es_task_handler.handler = None
@@ -224,6 +375,16 @@ def test_close_with_no_handler(self):
             self.assertEqual(0, len(log_file.read()))
         self.assertTrue(self.es_task_handler.closed)
 
+    def test_close_with_no_handler_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.handler = None
+        self.es_task_handler.close()
+
+        self.assertTrue(self.es_task_handler.closed)
+
     def test_close_with_no_stream(self):
         self.es_task_handler.set_context(self.ti)
         self.es_task_handler.handler.stream = None
@@ -243,6 +404,22 @@ def test_close_with_no_stream(self):
             self.assertIn(self.end_of_log_mark, log_file.read())
         self.assertTrue(self.es_task_handler.closed)
 
+    def test_close_with_no_stream_stdout_logs(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.handler.stream = None
+        self.es_task_handler.close()
+
+        self.assertTrue(self.es_task_handler.closed)
+
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.handler.stream.close()
+        self.es_task_handler.close()
+
+        self.assertTrue(self.es_task_handler.closed)
+
     def test_render_log_id(self):
         expected_log_id = 'dag_for_testing_file_task_handler-' \
                           'task_for_testing_file_log_handler-2016-01-01T00:00:00+00:00-1'
@@ -258,3 +435,27 @@ def test_render_log_id(self):
         )
         log_id = self.es_task_handler._render_log_id(self.ti, 1)
         self.assertEqual(expected_log_id, log_id)
+
+    def test_render_log_id_json_true(self):
+        self.es_task_handler.write_stdout = True
+        self.es_task_handler.json_format = True
+
+        expected_log_id = 'dag_for_testing_file_task_handler_' \
+                          'task_for_testing_file_log_handler_2016_01_01T00_00_00_00_00_1'
+        log_id = self.es_task_handler._render_log_id(self.ti, 1)
+        self.assertEqual(expected_log_id, log_id)
+
+        # Switch to use jinja template
+        self.es_task_handler.write_stdout = None
+        self.es_task_handler.json_format = None
+
+        jinja_expected_log_id = 'dag_for_testing_file_task_handler-' \
+                                'task_for_testing_file_log_handler-2016-01-01T00:00:00+00:00-1'
+        self.es_task_handler = ElasticsearchTaskHandler(
+            self.local_log_location,
+            self.filename_template,
+            '{{ ti.dag_id }}-{{ ti.task_id }}-{{ ts }}-{{ try_number }}',
+            self.end_of_log_mark
+        )
+        jinja_log_id = self.es_task_handler._render_log_id(self.ti, 1)
+        self.assertEqual(jinja_expected_log_id, jinja_log_id)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services