You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:26 UTC

[airflow] 10/41: Simplify RTIF.delete_old_records() (#26667)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 29bc32a67ae9dc645324996eaf64ba19b032193c
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Sep 26 21:49:17 2022 +0800

    Simplify RTIF.delete_old_records() (#26667)
    
    A lot of the codes and comments are actually not relevant since we've
    removed the funky execution_date based filtering in 2.3, and we can
    simply the implementation quite a bit now.
    
    (cherry picked from commit 0e79dd0b1722a610c898da0ba8557b8a94da568c)
---
 airflow/models/renderedtifields.py | 68 +++++++++++++++-----------------------
 airflow/utils/sqlalchemy.py        | 20 ++++++++++-
 2 files changed, 46 insertions(+), 42 deletions(-)

diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py
index 2de03fba67..c0f30a2a1a 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -19,9 +19,10 @@
 from __future__ import annotations
 
 import os
+from typing import TYPE_CHECKING
 
 import sqlalchemy_jsonfield
-from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, and_, not_, text, tuple_
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, text
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.orm import Session, relationship
 
@@ -32,6 +33,10 @@ from airflow.serialization.helpers import serialize_template_field
 from airflow.settings import json
 from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.sqlalchemy import tuple_not_in_condition
+
+if TYPE_CHECKING:
+    from sqlalchemy.sql import FromClause
 
 
 class RenderedTaskInstanceFields(Base):
@@ -183,9 +188,9 @@ class RenderedTaskInstanceFields(Base):
         cls,
         task_id: str,
         dag_id: str,
-        num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
-        session: Session = None,
-    ):
+        num_to_keep: int = conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
+        session: Session = NEW_SESSION,
+    ) -> None:
         """
         Keep only Last X (num_to_keep) number of records for a task by deleting others.
 
@@ -211,49 +216,30 @@ class RenderedTaskInstanceFields(Base):
             .limit(num_to_keep)
         )
 
-        if session.bind.dialect.name in ["postgresql", "sqlite"]:
-            # Fetch Top X records given dag_id & task_id ordered by Execution Date
-            subq1 = tis_to_keep_query.subquery()
-            excluded = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id)
-            session.query(cls).filter(
-                cls.dag_id == dag_id,
-                cls.task_id == task_id,
-                tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(excluded),
-            ).delete(synchronize_session=False)
-        elif session.bind.dialect.name in ["mysql"]:
-            cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, tis_to_keep_query)
-        else:
-            # Fetch Top X records given dag_id & task_id ordered by Execution Date
-            tis_to_keep = tis_to_keep_query.all()
-
-            filter_tis = [
-                not_(
-                    and_(
-                        cls.dag_id == ti.dag_id,
-                        cls.task_id == ti.task_id,
-                        cls.run_id == ti.run_id,
-                    )
-                )
-                for ti in tis_to_keep
-            ]
-
-            session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
-
+        cls._do_delete_old_records(
+            dag_id=dag_id,
+            task_id=task_id,
+            ti_clause=tis_to_keep_query.subquery(),
+            session=session,
+        )
         session.flush()
 
     @classmethod
     @retry_db_transaction
-    def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, tis_to_keep_query):
-        # Fetch Top X records given dag_id & task_id ordered by Execution Date
-        subq1 = tis_to_keep_query.subquery('subq1')
-        # Second Subquery
-        # Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)
-        # Limitation: This version of MySQL does not yet support
-        # LIMIT & IN/ALL/ANY/SOME subquery
-        subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id).subquery('subq2')
+    def _do_delete_old_records(
+        cls,
+        *,
+        task_id: str,
+        dag_id: str,
+        ti_clause: FromClause,
+        session: Session,
+    ) -> None:
         # This query might deadlock occasionally and it should be retried if fails (see decorator)
         session.query(cls).filter(
             cls.dag_id == dag_id,
             cls.task_id == task_id,
-            tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),
+            tuple_not_in_condition(
+                (cls.dag_id, cls.task_id, cls.run_id),
+                session.query(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id),
+            ),
         ).delete(synchronize_session=False)
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index ef0d29ebf8..0531c29dbf 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -25,7 +25,7 @@ from typing import Any, Iterable
 
 import pendulum
 from dateutil import relativedelta
-from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, tuple_
+from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
 from sqlalchemy.dialects import mssql, mysql
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import Session
@@ -422,3 +422,21 @@ def tuple_in_condition(
     if not clauses:
         return false()
     return or_(*clauses)
+
+
+def tuple_not_in_condition(
+    columns: tuple[ColumnElement, ...],
+    collection: Iterable[Any],
+) -> ColumnOperators:
+    """Generates a tuple-not-in-collection operator to use in ``.filter()``.
+
+    This is similar to ``tuple_in_condition`` except generating ``NOT IN``.
+
+    :meta private:
+    """
+    if settings.engine.dialect.name != "mssql":
+        return tuple_(*columns).not_in(collection)
+    clauses = [or_(*(c != v for c, v in zip(columns, values))) for values in collection]
+    if not clauses:
+        return true()
+    return and_(*clauses)