You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/06 11:41:26 UTC

incubator-airflow git commit: [AIRFLOW-1564] Use Jinja2 to render logging filename

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 32750601a -> 4c674ccff


[AIRFLOW-1564] Use Jinja2 to render logging filename

Still backwards compatible with python format

Closes #2565 from NielsZeilemaker/AIRFLOW-1564


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c674ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c674ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c674ccf

Branch: refs/heads/master
Commit: 4c674ccffda1fbc38b8cc044b0e2c004422a2035
Parents: 3275060
Author: Niels Zeilemaker <ni...@godatadriven.com>
Authored: Wed Sep 6 13:41:20 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 6 13:41:20 2017 +0200

----------------------------------------------------------------------
 .../config_templates/default_airflow_logging.py |  2 +-
 airflow/utils/log/file_task_handler.py          | 25 +++++++++++++++-----
 airflow/utils/log/gcs_task_handler.py           |  8 ++-----
 airflow/utils/log/s3_task_handler.py            |  8 ++-----
 tests/utils/test_log_handlers.py                | 23 ++++++++++++++++++
 5 files changed, 47 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py
index d6ae036..523b4e8 100644
--- a/airflow/config_templates/default_airflow_logging.py
+++ b/airflow/config_templates/default_airflow_logging.py
@@ -37,7 +37,7 @@ if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'):
 elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'):
     GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
 
-FILENAME_TEMPLATE = '{dag_id}/{task_id}/{execution_date}/{try_number}.log'
+FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
 
 DEFAULT_LOGGING_CONFIG = {
     'version': 1,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index bce974c..7392aae 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -15,6 +15,8 @@
 import logging
 import os
 
+from jinja2 import Template
+
 from airflow import configuration as conf
 from airflow.configuration import AirflowConfigException
 from airflow.utils.file import mkdirs
@@ -37,6 +39,10 @@ class FileTaskHandler(logging.Handler):
         self.handler = None
         self.local_base = base_log_folder
         self.filename_template = filename_template
+        self.filename_jinja_template = None
+
+        if "{{" in self.filename_template: #jinja mode
+            self.filename_jinja_template = Template(self.filename_template)
 
     def set_context(self, ti):
         """
@@ -59,6 +65,17 @@ class FileTaskHandler(logging.Handler):
     def close(self):
         if self.handler is not None:
             self.handler.close()
+            
+    def _render_filename(self, ti, try_number):
+        if self.filename_jinja_template:
+            jinja_context = ti.get_template_context()
+            jinja_context['try_number'] = try_number
+            return self.filename_jinja_template.render(**jinja_context) 
+            
+        return self.filename_template.format(dag_id=ti.dag_id, 
+                                             task_id=ti.task_id,
+                                             execution_date=ti.execution_date.isoformat(), 
+                                             try_number=try_number)
 
     def _read(self, ti, try_number):
         """
@@ -71,9 +88,7 @@ class FileTaskHandler(logging.Handler):
         # Task instance here might be different from task instance when
         # initializing the handler. Thus explicitly getting log location
         # is needed to get correct log path.
-        log_relative_path = self.filename_template.format(
-            dag_id=ti.dag_id, task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(), try_number=try_number + 1)
+        log_relative_path = self._render_filename(ti, try_number + 1)
         loc = os.path.join(self.local_base, log_relative_path)
         log = ""
 
@@ -153,9 +168,7 @@ class FileTaskHandler(logging.Handler):
         # writable by both users, then it's possible that re-running a task
         # via the UI (or vice versa) results in a permission error as the task
         # tries to write to a log file created by the other user.
-        relative_path = self.filename_template.format(
-            dag_id=ti.dag_id, task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1)
+        relative_path = self._render_filename(ti, ti.try_number + 1)
         full_path = os.path.join(self.local_base, relative_path)
         directory = os.path.dirname(full_path)
         # Create the log file and give it group writable permissions

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 5b35907..c340f10 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -39,9 +39,7 @@ class GCSTaskHandler(FileTaskHandler):
         # Log relative path is used to construct local and remote
         # log path to upload log files into GCS and read from the
         # remote location.
-        self.log_relative_path = self.filename_template(
-            dag_id=ti.dag_id, task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1)
+        self.log_relative_path = self._render_filename(ti, ti.try_number + 1)
 
     def close(self):
         """
@@ -76,9 +74,7 @@ class GCSTaskHandler(FileTaskHandler):
         # Explicitly getting log relative path is necessary as the given
         # task instance might be different than task instance passed in
         # in set_context method.
-        log_relative_path = self.filename_template.format(
-            dag_id=ti.dag_id, task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(), try_number=try_number + 1)
+        log_relative_path = self._render_filename(ti, try_number + 1)
         remote_loc = os.path.join(self.remote_base, log_relative_path)
 
         gcs_log = logging_utils.GCSLog()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 7268d22..51baaac 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -35,9 +35,7 @@ class S3TaskHandler(FileTaskHandler):
         super(S3TaskHandler, self).set_context(ti)
         # Local location and remote location is needed to open and
         # upload local log file to S3 remote storage.
-        self.log_relative_path = self.filename_template.format(
-            dag_id=ti.dag_id, task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1)
+        self.log_relative_path = self._render_filename(ti, ti.try_number + 1)
 
     def close(self):
         """
@@ -72,9 +70,7 @@ class S3TaskHandler(FileTaskHandler):
         # Explicitly getting log relative path is necessary as the given
         # task instance might be different than task instance passed in
         # in set_context method.
-        log_relative_path = self.filename_template.format(
-            dag_id=ti.dag_id, task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(), try_number=try_number + 1)
+        log_relative_path = self._render_filename(ti, try_number + 1)
         remote_loc = os.path.join(self.remote_base, log_relative_path)
 
         s3_log = logging_utils.S3Log()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 5b0d8a6..8337c5d 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -23,6 +23,7 @@ from datetime import datetime
 from airflow.models import TaskInstance, DAG
 from airflow.config_templates.default_airflow_logging import DEFAULT_LOGGING_CONFIG
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.log.file_task_handler import FileTaskHandler
 
 DEFAULT_DATE = datetime(2016, 1, 1)
 TASK_LOGGER = 'airflow.task'
@@ -71,3 +72,25 @@ class TestFileTaskLogHandler(unittest.TestCase):
 
         # Remove the generated tmp log file.
         os.remove(log_filename)
+
+    
+class TestFilenameRendering(unittest.TestCase):
+    
+    def setUp(self):
+        dag = DAG('dag_for_testing_filename_rendering', start_date=DEFAULT_DATE)
+        task = DummyOperator(task_id='task_for_testing_filename_rendering', dag=dag)
+        self.ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+    
+    def test_python_formatting(self):
+        expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat()
+        
+        fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log')
+        rendered_filename = fth._render_filename(self.ti, 42)
+        self.assertEqual(expected_filename, rendered_filename)
+        
+    def test_jinja_rendering(self):
+        expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat()
+        
+        fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log')
+        rendered_filename = fth._render_filename(self.ti, 42)
+        self.assertEqual(expected_filename, rendered_filename)
\ No newline at end of file