You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/06/30 16:19:47 UTC
[airflow] branch main updated: Fix purge_inactive_dag_warnings filter (#24749)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fab2913d1a Fix purge_inactive_dag_warnings filter (#24749)
fab2913d1a is described below
commit fab2913d1aad0f076d35f3c424cb128016e08330
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Fri Jul 1 00:19:38 2022 +0800
Fix purge_inactive_dag_warnings filter (#24749)
---
airflow/models/dagwarning.py | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py
index 68745ed645..de686196c5 100644
--- a/airflow/models/dagwarning.py
+++ b/airflow/models/dagwarning.py
@@ -18,6 +18,7 @@
from enum import Enum
from sqlalchemy import Column, ForeignKeyConstraint, String, Text, false
+from sqlalchemy.orm import Session
from airflow.models.base import Base, StringID
from airflow.utils import timezone
@@ -49,21 +50,21 @@ class DagWarning(Base):
),
)
- def __init__(self, dag_id, error_type, message, **kwargs):
+ def __init__(self, dag_id: str, error_type: str, message: str, **kwargs):
super().__init__(**kwargs)
self.dag_id = dag_id
self.warning_type = DagWarningType(error_type).value # make sure valid type
self.message = message
- def __eq__(self, other):
+ def __eq__(self, other) -> bool:
return self.dag_id == other.dag_id and self.warning_type == other.warning_type
- def __hash__(self):
+ def __hash__(self) -> int:
return hash((self.dag_id, self.warning_type))
@classmethod
@provide_session
- def purge_inactive_dag_warnings(cls, session=NEW_SESSION):
+ def purge_inactive_dag_warnings(cls, session: Session = NEW_SESSION) -> None:
"""
Deactivate DagWarning records for inactive dags.
@@ -72,12 +73,11 @@ class DagWarning(Base):
from airflow.models.dag import DagModel
if session.get_bind().dialect.name == 'sqlite':
- dag_ids = session.query(DagModel).filter(DagModel.is_active == false()).all()
- session.query(cls).filter(cls.dag_id.in_(dag_ids)).delete(synchronize_session=False)
+ dag_ids = session.query(DagModel.dag_id).filter(DagModel.is_active == false())
+ query = session.query(cls).filter(cls.dag_id.in_(dag_ids))
else:
- session.query(cls).filter(cls.dag_id == DagModel.dag_id, DagModel.is_active == false()).delete(
- synchronize_session=False
- )
+ query = session.query(cls).filter(cls.dag_id == DagModel.dag_id, DagModel.is_active == false())
+ query.delete(synchronize_session=False)
session.commit()