You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/15 20:51:59 UTC

[airflow] 01/05: Use compat data interval shim in log handlers (#21289)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f27357fa415ff23021073a3a8c218fe9b99a143b
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Sat Feb 12 11:40:29 2022 +0800

    Use compat data interval shim in log handlers (#21289)
    
    (cherry picked from commit 44bd211b19dcb75eeb53ced5bea2cf0c80654b1a)
---
 .../providers/elasticsearch/log/es_task_handler.py | 27 ++++++++++++-----
 airflow/utils/log/file_task_handler.py             | 35 +++++++++++++++++-----
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index cd08971..b591aef 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -101,15 +101,25 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         self.context_set = False
 
     def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
-        dag_run = ti.dag_run
+        dag_run = ti.get_dagrun()
+        try:
+            data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+        except AttributeError:  # ti.task is not always set.
+            data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
 
         if self.json_format:
-            data_interval_start = self._clean_date(dag_run.data_interval_start)
-            data_interval_end = self._clean_date(dag_run.data_interval_end)
+            data_interval_start = self._clean_date(data_interval[0])
+            data_interval_end = self._clean_date(data_interval[1])
             execution_date = self._clean_date(dag_run.execution_date)
         else:
-            data_interval_start = dag_run.data_interval_start.isoformat()
-            data_interval_end = dag_run.data_interval_end.isoformat()
+            if data_interval[0]:
+                data_interval_start = data_interval[0].isoformat()
+            else:
+                data_interval_start = ""
+            if data_interval[1]:
+                data_interval_end = data_interval[1].isoformat()
+            else:
+                data_interval_end = ""
             execution_date = dag_run.execution_date.isoformat()
 
         return self.log_id_template.format(
@@ -123,14 +133,15 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         )
 
     @staticmethod
-    def _clean_date(value: datetime) -> str:
+    def _clean_date(value: Optional[datetime]) -> str:
         """
         Clean up a date value so that it is safe to query in elasticsearch
         by removing reserved characters.
-        # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
 
-        :param execution_date: execution date of the dag run.
+        https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
         """
+        if value is None:
+            return ""
         return value.strftime("%Y_%m_%dT%H_%M_%S_%f")
 
     def _group_logs_by_host(self, logs):
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6e57c67..e13b8d4 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -18,8 +18,9 @@
 """File logging handler for tasks."""
 import logging
 import os
+from datetime import datetime
 from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Optional, Tuple
 
 import httpx
 from itsdangerous import TimedJSONWebSignatureSerializer
@@ -82,13 +83,31 @@ class FileTaskHandler(logging.Handler):
                 context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
             context["try_number"] = try_number
             return render_template_to_string(self.filename_jinja_template, context)
-
-        return self.filename_template.format(
-            dag_id=ti.dag_id,
-            task_id=ti.task_id,
-            execution_date=ti.get_dagrun().logical_date.isoformat(),
-            try_number=try_number,
-        )
+        elif self.filename_template:
+            dag_run = ti.get_dagrun()
+            try:
+                data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+            except AttributeError:  # ti.task is not always set.
+                data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
+            if data_interval[0]:
+                data_interval_start = data_interval[0].isoformat()
+            else:
+                data_interval_start = ""
+            if data_interval[1]:
+                data_interval_end = data_interval[1].isoformat()
+            else:
+                data_interval_end = ""
+            return self.filename_template.format(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                run_id=ti.run_id,
+                data_interval_start=data_interval_start,
+                data_interval_end=data_interval_end,
+                execution_date=ti.get_dagrun().logical_date.isoformat(),
+                try_number=try_number,
+            )
+        else:
+            raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
 
     def _read_grouped_logs(self):
         return False