You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:26 UTC
[airflow] 10/41: Simplify RTIF.delete_old_records() (#26667)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 29bc32a67ae9dc645324996eaf64ba19b032193c
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Sep 26 21:49:17 2022 +0800
Simplify RTIF.delete_old_records() (#26667)
A lot of the codes and comments are actually not relevant since we've
removed the funky execution_date based filtering in 2.3, and we can
simply the implementation quite a bit now.
(cherry picked from commit 0e79dd0b1722a610c898da0ba8557b8a94da568c)
---
airflow/models/renderedtifields.py | 68 +++++++++++++++-----------------------
airflow/utils/sqlalchemy.py | 20 ++++++++++-
2 files changed, 46 insertions(+), 42 deletions(-)
diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py
index 2de03fba67..c0f30a2a1a 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -19,9 +19,10 @@
from __future__ import annotations
import os
+from typing import TYPE_CHECKING
import sqlalchemy_jsonfield
-from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, and_, not_, text, tuple_
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, text
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Session, relationship
@@ -32,6 +33,10 @@ from airflow.serialization.helpers import serialize_template_field
from airflow.settings import json
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.sqlalchemy import tuple_not_in_condition
+
+if TYPE_CHECKING:
+ from sqlalchemy.sql import FromClause
class RenderedTaskInstanceFields(Base):
@@ -183,9 +188,9 @@ class RenderedTaskInstanceFields(Base):
cls,
task_id: str,
dag_id: str,
- num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
- session: Session = None,
- ):
+ num_to_keep: int = conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
+ session: Session = NEW_SESSION,
+ ) -> None:
"""
Keep only Last X (num_to_keep) number of records for a task by deleting others.
@@ -211,49 +216,30 @@ class RenderedTaskInstanceFields(Base):
.limit(num_to_keep)
)
- if session.bind.dialect.name in ["postgresql", "sqlite"]:
- # Fetch Top X records given dag_id & task_id ordered by Execution Date
- subq1 = tis_to_keep_query.subquery()
- excluded = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id)
- session.query(cls).filter(
- cls.dag_id == dag_id,
- cls.task_id == task_id,
- tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(excluded),
- ).delete(synchronize_session=False)
- elif session.bind.dialect.name in ["mysql"]:
- cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, tis_to_keep_query)
- else:
- # Fetch Top X records given dag_id & task_id ordered by Execution Date
- tis_to_keep = tis_to_keep_query.all()
-
- filter_tis = [
- not_(
- and_(
- cls.dag_id == ti.dag_id,
- cls.task_id == ti.task_id,
- cls.run_id == ti.run_id,
- )
- )
- for ti in tis_to_keep
- ]
-
- session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
-
+ cls._do_delete_old_records(
+ dag_id=dag_id,
+ task_id=task_id,
+ ti_clause=tis_to_keep_query.subquery(),
+ session=session,
+ )
session.flush()
@classmethod
@retry_db_transaction
- def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, tis_to_keep_query):
- # Fetch Top X records given dag_id & task_id ordered by Execution Date
- subq1 = tis_to_keep_query.subquery('subq1')
- # Second Subquery
- # Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)
- # Limitation: This version of MySQL does not yet support
- # LIMIT & IN/ALL/ANY/SOME subquery
- subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id).subquery('subq2')
+ def _do_delete_old_records(
+ cls,
+ *,
+ task_id: str,
+ dag_id: str,
+ ti_clause: FromClause,
+ session: Session,
+ ) -> None:
# This query might deadlock occasionally and it should be retried if fails (see decorator)
session.query(cls).filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
- tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),
+ tuple_not_in_condition(
+ (cls.dag_id, cls.task_id, cls.run_id),
+ session.query(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id),
+ ),
).delete(synchronize_session=False)
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index ef0d29ebf8..0531c29dbf 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -25,7 +25,7 @@ from typing import Any, Iterable
import pendulum
from dateutil import relativedelta
-from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, tuple_
+from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
@@ -422,3 +422,21 @@ def tuple_in_condition(
if not clauses:
return false()
return or_(*clauses)
+
+
+def tuple_not_in_condition(
+ columns: tuple[ColumnElement, ...],
+ collection: Iterable[Any],
+) -> ColumnOperators:
+ """Generates a tuple-not-in-collection operator to use in ``.filter()``.
+
+ This is similar to ``tuple_in_condition`` except generating ``NOT IN``.
+
+ :meta private:
+ """
+ if settings.engine.dialect.name != "mssql":
+ return tuple_(*columns).not_in(collection)
+ clauses = [or_(*(c != v for c, v in zip(columns, values))) for values in collection]
+ if not clauses:
+ return true()
+ return and_(*clauses)