You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/02/24 13:47:09 UTC

[airflow] branch main updated: Upgrade and record elasticsearch log_id_template changes (#21734)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b6e141a  Upgrade and record elasticsearch log_id_template changes (#21734)
b6e141a is described below

commit b6e141aefe2bd335bc2c8104832efd31eb8208da
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Thu Feb 24 13:46:30 2022 +0000

    Upgrade and record elasticsearch log_id_template changes (#21734)
---
 UPDATING.md              | 14 ++++++++++++++
 airflow/configuration.py |  7 +++++++
 airflow/utils/db.py      | 41 +++++++++++++++++++++++++++++------------
 3 files changed, 50 insertions(+), 12 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index a9ba808..bd9e5e3 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -129,6 +129,20 @@ Previously, a task’s log is dynamically rendered from the `[core] log_filename
 
 A new `log_template` table is introduced to solve this problem. This table is synchronised with the aforementioned config values every time Airflow starts, and a new field `log_template_id` is added to every DAG run to point to the format used by tasks (`NULL` indicates the first ever entry for compatibility).
 
+### Default templates for log filenames and elasticsearch log_id changed
+
+In order to support Dynamic Task Mapping the default templates for per-task instance logging has changed. If your config contains the old default values they will be upgraded-in-place.
+
+If you are happy with the new config values you should _remove_ the setting in `airflow.cfg` and let the default value be used. Old default values were:
+
+
+- `[core] log_filename_template`: `{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log`
+- `[elasticsearch] log_id_template`: `{dag_id}-{task_id}-{execution_date}-{try_number}`
+
+`[core] log_filename_template` now uses "hive partition style" of `dag_id=<id>/run_id=<id>` by default, which may cause problems on some older FAT filesystems. If this affects you then you will have to change the log template.
+
+If you have customized the templates you should ensure that they contain `{{ ti.map_index }}` if you want to use dynamically mapped tasks.
+
 ### `airflow.models.base.Operator` is removed
 
 Previously, there was an empty class `airflow.models.base.Operator` for “type hinting”. This class was never really useful for anything (everything it did could be done better with `airflow.models.baseoperator.BaseOperator`), and has been removed. If you are relying on the class’s existence, use `BaseOperator` (for concrete operators), `airflow.models.abstractoperator.AbstractOperator` (the base class of both `BaseOperator` and the AIP-42 `MappedOperator`), or `airflow.models.operator.Op [...]
diff --git a/airflow/configuration.py b/airflow/configuration.py
index c48f851..03a1e7a 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -218,6 +218,13 @@ class AirflowConfigParser(ConfigParser):
                 '3.0',
             ),
         },
+        'elasticsearch': {
+            'log_id_template': (
+                re.compile('^' + re.escape('{dag_id}-{task_id}-{run_id}-{try_number}') + '$'),
+                '{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}',
+                3.0,
+            )
+        },
     }
 
     _available_logging_levels = ['CRITICAL', 'FATAL', 'ERROR', 'WARN', 'WARNING', 'INFO', 'DEBUG']
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index e9b2ef2..2038d92 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -748,24 +748,41 @@ def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
     This checks if the last row fully matches the current config values, and
     insert a new row if not.
     """
-
-    def check_templates(filename, elasticsearch_id):
-        stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
-
-        if not stored or stored.filename != filename or stored.elasticsearch_id != elasticsearch_id:
-            session.add(LogTemplate(filename=filename, elasticsearch_id=elasticsearch_id))
-
     filename = conf.get("logging", "log_filename_template")
     elasticsearch_id = conf.get("elasticsearch", "log_id_template")
 
     # Before checking if the _current_ value exists, we need to check if the old config value we upgraded in
     # place exists!
-    pre_upgrade_filename = conf.upgraded_values.get(('logging', 'log_filename_template'), None)
-    if pre_upgrade_filename is not None:
-        check_templates(pre_upgrade_filename, elasticsearch_id)
-        session.flush()
+    pre_upgrade_filename = conf.upgraded_values.get(("logging", "log_filename_template"), filename)
+    pre_upgrade_elasticsearch_id = conf.upgraded_values.get(
+        ("elasticsearch", "log_id_template"), elasticsearch_id
+    )
+
+    if pre_upgrade_filename != filename or pre_upgrade_elasticsearch_id != elasticsearch_id:
+        # The previous non-upgraded value likely won't be the _latest_ value (as after we've recorded the
+        # recorded the upgraded value it will be second-to-newest), so we'll have to just search which is okay
+        # as this is a table with a tiny number of rows
+        row = (
+            session.query(LogTemplate.id)
+            .filter(
+                or_(
+                    LogTemplate.filename == pre_upgrade_filename,
+                    LogTemplate.elasticsearch_id == pre_upgrade_elasticsearch_id,
+                )
+            )
+            .order_by(LogTemplate.id.desc())
+            .first()
+        )
+        if not row:
+            session.add(
+                LogTemplate(filename=pre_upgrade_filename, elasticsearch_id=pre_upgrade_elasticsearch_id)
+            )
+            session.flush()
+
+    stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
 
-    check_templates(filename, elasticsearch_id)
+    if not stored or stored.filename != filename or stored.elasticsearch_id != elasticsearch_id:
+        session.add(LogTemplate(filename=filename, elasticsearch_id=elasticsearch_id))
 
 
 def check_conn_id_duplicates(session: Session) -> Iterable[str]: