You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/06 11:41:26 UTC
incubator-airflow git commit: [AIRFLOW-1564] Use Jinja2 to render
logging filename
Repository: incubator-airflow
Updated Branches:
refs/heads/master 32750601a -> 4c674ccff
[AIRFLOW-1564] Use Jinja2 to render logging filename
Still backwards compatible with python format
Closes #2565 from NielsZeilemaker/AIRFLOW-1564
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c674ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c674ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c674ccf
Branch: refs/heads/master
Commit: 4c674ccffda1fbc38b8cc044b0e2c004422a2035
Parents: 3275060
Author: Niels Zeilemaker <ni...@godatadriven.com>
Authored: Wed Sep 6 13:41:20 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 6 13:41:20 2017 +0200
----------------------------------------------------------------------
.../config_templates/default_airflow_logging.py | 2 +-
airflow/utils/log/file_task_handler.py | 25 +++++++++++++++-----
airflow/utils/log/gcs_task_handler.py | 8 ++-----
airflow/utils/log/s3_task_handler.py | 8 ++-----
tests/utils/test_log_handlers.py | 23 ++++++++++++++++++
5 files changed, 47 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py
index d6ae036..523b4e8 100644
--- a/airflow/config_templates/default_airflow_logging.py
+++ b/airflow/config_templates/default_airflow_logging.py
@@ -37,7 +37,7 @@ if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'):
elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'):
GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
-FILENAME_TEMPLATE = '{dag_id}/{task_id}/{execution_date}/{try_number}.log'
+FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
DEFAULT_LOGGING_CONFIG = {
'version': 1,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index bce974c..7392aae 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -15,6 +15,8 @@
import logging
import os
+from jinja2 import Template
+
from airflow import configuration as conf
from airflow.configuration import AirflowConfigException
from airflow.utils.file import mkdirs
@@ -37,6 +39,10 @@ class FileTaskHandler(logging.Handler):
self.handler = None
self.local_base = base_log_folder
self.filename_template = filename_template
+ self.filename_jinja_template = None
+
+ if "{{" in self.filename_template: #jinja mode
+ self.filename_jinja_template = Template(self.filename_template)
def set_context(self, ti):
"""
@@ -59,6 +65,17 @@ class FileTaskHandler(logging.Handler):
def close(self):
if self.handler is not None:
self.handler.close()
+
+ def _render_filename(self, ti, try_number):
+ if self.filename_jinja_template:
+ jinja_context = ti.get_template_context()
+ jinja_context['try_number'] = try_number
+ return self.filename_jinja_template.render(**jinja_context)
+
+ return self.filename_template.format(dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ execution_date=ti.execution_date.isoformat(),
+ try_number=try_number)
def _read(self, ti, try_number):
"""
@@ -71,9 +88,7 @@ class FileTaskHandler(logging.Handler):
# Task instance here might be different from task instance when
# initializing the handler. Thus explicitly getting log location
# is needed to get correct log path.
- log_relative_path = self.filename_template.format(
- dag_id=ti.dag_id, task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(), try_number=try_number + 1)
+ log_relative_path = self._render_filename(ti, try_number + 1)
loc = os.path.join(self.local_base, log_relative_path)
log = ""
@@ -153,9 +168,7 @@ class FileTaskHandler(logging.Handler):
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
- relative_path = self.filename_template.format(
- dag_id=ti.dag_id, task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1)
+ relative_path = self._render_filename(ti, ti.try_number + 1)
full_path = os.path.join(self.local_base, relative_path)
directory = os.path.dirname(full_path)
# Create the log file and give it group writable permissions
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 5b35907..c340f10 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -39,9 +39,7 @@ class GCSTaskHandler(FileTaskHandler):
# Log relative path is used to construct local and remote
# log path to upload log files into GCS and read from the
# remote location.
- self.log_relative_path = self.filename_template(
- dag_id=ti.dag_id, task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1)
+ self.log_relative_path = self._render_filename(ti, ti.try_number + 1)
def close(self):
"""
@@ -76,9 +74,7 @@ class GCSTaskHandler(FileTaskHandler):
# Explicitly getting log relative path is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
- log_relative_path = self.filename_template.format(
- dag_id=ti.dag_id, task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(), try_number=try_number + 1)
+ log_relative_path = self._render_filename(ti, try_number + 1)
remote_loc = os.path.join(self.remote_base, log_relative_path)
gcs_log = logging_utils.GCSLog()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 7268d22..51baaac 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -35,9 +35,7 @@ class S3TaskHandler(FileTaskHandler):
super(S3TaskHandler, self).set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to S3 remote storage.
- self.log_relative_path = self.filename_template.format(
- dag_id=ti.dag_id, task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1)
+ self.log_relative_path = self._render_filename(ti, ti.try_number + 1)
def close(self):
"""
@@ -72,9 +70,7 @@ class S3TaskHandler(FileTaskHandler):
# Explicitly getting log relative path is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
- log_relative_path = self.filename_template.format(
- dag_id=ti.dag_id, task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(), try_number=try_number + 1)
+ log_relative_path = self._render_filename(ti, try_number + 1)
remote_loc = os.path.join(self.remote_base, log_relative_path)
s3_log = logging_utils.S3Log()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c674ccf/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 5b0d8a6..8337c5d 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -23,6 +23,7 @@ from datetime import datetime
from airflow.models import TaskInstance, DAG
from airflow.config_templates.default_airflow_logging import DEFAULT_LOGGING_CONFIG
from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.log.file_task_handler import FileTaskHandler
DEFAULT_DATE = datetime(2016, 1, 1)
TASK_LOGGER = 'airflow.task'
@@ -71,3 +72,25 @@ class TestFileTaskLogHandler(unittest.TestCase):
# Remove the generated tmp log file.
os.remove(log_filename)
+
+
+class TestFilenameRendering(unittest.TestCase):
+
+ def setUp(self):
+ dag = DAG('dag_for_testing_filename_rendering', start_date=DEFAULT_DATE)
+ task = DummyOperator(task_id='task_for_testing_filename_rendering', dag=dag)
+ self.ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+
+ def test_python_formatting(self):
+ expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat()
+
+ fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log')
+ rendered_filename = fth._render_filename(self.ti, 42)
+ self.assertEqual(expected_filename, rendered_filename)
+
+ def test_jinja_rendering(self):
+ expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat()
+
+ fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log')
+ rendered_filename = fth._render_filename(self.ti, 42)
+ self.assertEqual(expected_filename, rendered_filename)
\ No newline at end of file