You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/06/30 16:19:47 UTC

[airflow] branch main updated: Fix purge_inactive_dag_warnings filter (#24749)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fab2913d1a Fix purge_inactive_dag_warnings filter (#24749)
fab2913d1a is described below

commit fab2913d1aad0f076d35f3c424cb128016e08330
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Fri Jul 1 00:19:38 2022 +0800

    Fix purge_inactive_dag_warnings filter (#24749)
---
 airflow/models/dagwarning.py | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py
index 68745ed645..de686196c5 100644
--- a/airflow/models/dagwarning.py
+++ b/airflow/models/dagwarning.py
@@ -18,6 +18,7 @@
 from enum import Enum
 
 from sqlalchemy import Column, ForeignKeyConstraint, String, Text, false
+from sqlalchemy.orm import Session
 
 from airflow.models.base import Base, StringID
 from airflow.utils import timezone
@@ -49,21 +50,21 @@ class DagWarning(Base):
         ),
     )
 
-    def __init__(self, dag_id, error_type, message, **kwargs):
+    def __init__(self, dag_id: str, error_type: str, message: str, **kwargs):
         super().__init__(**kwargs)
         self.dag_id = dag_id
         self.warning_type = DagWarningType(error_type).value  # make sure valid type
         self.message = message
 
-    def __eq__(self, other):
+    def __eq__(self, other) -> bool:
         return self.dag_id == other.dag_id and self.warning_type == other.warning_type
 
-    def __hash__(self):
+    def __hash__(self) -> int:
         return hash((self.dag_id, self.warning_type))
 
     @classmethod
     @provide_session
-    def purge_inactive_dag_warnings(cls, session=NEW_SESSION):
+    def purge_inactive_dag_warnings(cls, session: Session = NEW_SESSION) -> None:
         """
         Deactivate DagWarning records for inactive dags.
 
@@ -72,12 +73,11 @@ class DagWarning(Base):
         from airflow.models.dag import DagModel
 
         if session.get_bind().dialect.name == 'sqlite':
-            dag_ids = session.query(DagModel).filter(DagModel.is_active == false()).all()
-            session.query(cls).filter(cls.dag_id.in_(dag_ids)).delete(synchronize_session=False)
+            dag_ids = session.query(DagModel.dag_id).filter(DagModel.is_active == false())
+            query = session.query(cls).filter(cls.dag_id.in_(dag_ids))
         else:
-            session.query(cls).filter(cls.dag_id == DagModel.dag_id, DagModel.is_active == false()).delete(
-                synchronize_session=False
-            )
+            query = session.query(cls).filter(cls.dag_id == DagModel.dag_id, DagModel.is_active == false())
+        query.delete(synchronize_session=False)
         session.commit()