You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/02/24 13:47:09 UTC
[airflow] branch main updated: Upgrade and record elasticsearch log_id_template changes (#21734)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b6e141a Upgrade and record elasticsearch log_id_template changes (#21734)
b6e141a is described below
commit b6e141aefe2bd335bc2c8104832efd31eb8208da
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Thu Feb 24 13:46:30 2022 +0000
Upgrade and record elasticsearch log_id_template changes (#21734)
---
UPDATING.md | 14 ++++++++++++++
airflow/configuration.py | 7 +++++++
airflow/utils/db.py | 41 +++++++++++++++++++++++++++++------------
3 files changed, 50 insertions(+), 12 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index a9ba808..bd9e5e3 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -129,6 +129,20 @@ Previously, a task’s log is dynamically rendered from the `[core] log_filename
A new `log_template` table is introduced to solve this problem. This table is synchronised with the aforementioned config values every time Airflow starts, and a new field `log_template_id` is added to every DAG run to point to the format used by tasks (`NULL` indicates the first ever entry for compatibility).
+### Default templates for log filenames and elasticsearch log_id changed
+
+In order to support Dynamic Task Mapping the default templates for per-task instance logging has changed. If your config contains the old default values they will be upgraded-in-place.
+
+If you are happy with the new config values you should _remove_ the setting in `airflow.cfg` and let the default value be used. Old default values were:
+
+
+- `[core] log_filename_template`: `{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log`
+- `[elasticsearch] log_id_template`: `{dag_id}-{task_id}-{execution_date}-{try_number}`
+
+`[core] log_filename_template` now uses "hive partition style" of `dag_id=<id>/run_id=<id>` by default, which may cause problems on some older FAT filesystems. If this affects you then you will have to change the log template.
+
+If you have customized the templates you should ensure that they contain `{{ ti.map_index }}` if you want to use dynamically mapped tasks.
+
### `airflow.models.base.Operator` is removed
Previously, there was an empty class `airflow.models.base.Operator` for “type hinting”. This class was never really useful for anything (everything it did could be done better with `airflow.models.baseoperator.BaseOperator`), and has been removed. If you are relying on the class’s existence, use `BaseOperator` (for concrete operators), `airflow.models.abstractoperator.AbstractOperator` (the base class of both `BaseOperator` and the AIP-42 `MappedOperator`), or `airflow.models.operator.Op [...]
diff --git a/airflow/configuration.py b/airflow/configuration.py
index c48f851..03a1e7a 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -218,6 +218,13 @@ class AirflowConfigParser(ConfigParser):
'3.0',
),
},
+ 'elasticsearch': {
+ 'log_id_template': (
+ re.compile('^' + re.escape('{dag_id}-{task_id}-{run_id}-{try_number}') + '$'),
+ '{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}',
+ 3.0,
+ )
+ },
}
_available_logging_levels = ['CRITICAL', 'FATAL', 'ERROR', 'WARN', 'WARNING', 'INFO', 'DEBUG']
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index e9b2ef2..2038d92 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -748,24 +748,41 @@ def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
This checks if the last row fully matches the current config values, and
insert a new row if not.
"""
-
- def check_templates(filename, elasticsearch_id):
- stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
-
- if not stored or stored.filename != filename or stored.elasticsearch_id != elasticsearch_id:
- session.add(LogTemplate(filename=filename, elasticsearch_id=elasticsearch_id))
-
filename = conf.get("logging", "log_filename_template")
elasticsearch_id = conf.get("elasticsearch", "log_id_template")
# Before checking if the _current_ value exists, we need to check if the old config value we upgraded in
# place exists!
- pre_upgrade_filename = conf.upgraded_values.get(('logging', 'log_filename_template'), None)
- if pre_upgrade_filename is not None:
- check_templates(pre_upgrade_filename, elasticsearch_id)
- session.flush()
+ pre_upgrade_filename = conf.upgraded_values.get(("logging", "log_filename_template"), filename)
+ pre_upgrade_elasticsearch_id = conf.upgraded_values.get(
+ ("elasticsearch", "log_id_template"), elasticsearch_id
+ )
+
+ if pre_upgrade_filename != filename or pre_upgrade_elasticsearch_id != elasticsearch_id:
+ # The previous non-upgraded value likely won't be the _latest_ value (as after we've recorded the
+ # recorded the upgraded value it will be second-to-newest), so we'll have to just search which is okay
+ # as this is a table with a tiny number of rows
+ row = (
+ session.query(LogTemplate.id)
+ .filter(
+ or_(
+ LogTemplate.filename == pre_upgrade_filename,
+ LogTemplate.elasticsearch_id == pre_upgrade_elasticsearch_id,
+ )
+ )
+ .order_by(LogTemplate.id.desc())
+ .first()
+ )
+ if not row:
+ session.add(
+ LogTemplate(filename=pre_upgrade_filename, elasticsearch_id=pre_upgrade_elasticsearch_id)
+ )
+ session.flush()
+
+ stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
- check_templates(filename, elasticsearch_id)
+ if not stored or stored.filename != filename or stored.elasticsearch_id != elasticsearch_id:
+ session.add(LogTemplate(filename=filename, elasticsearch_id=elasticsearch_id))
def check_conn_id_duplicates(session: Session) -> Iterable[str]: