You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/01/11 21:25:48 UTC
[airflow] 07/27: Add retry to purge_inactive_dag_warnings (#28481)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 825d9fcac867d994a11414f95e988ca3dac1b790
Author: Michael Petro <40...@users.noreply.github.com>
AuthorDate: Thu Jan 5 00:58:07 2023 -0500
Add retry to purge_inactive_dag_warnings (#28481)
Co-authored-by: Ephraim Anierobi <sp...@gmail.com>
(cherry picked from commit 5289938ec1d9011a9ff8625705cffd1708f9274d)
---
airflow/models/dagwarning.py | 6 ++++
tests/models/test_dagwarning.py | 69 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 75 insertions(+)
diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py
index fb77e42a9d..db93aafb0f 100644
--- a/airflow/models/dagwarning.py
+++ b/airflow/models/dagwarning.py
@@ -24,6 +24,7 @@ from sqlalchemy.orm import Session
from airflow.models.base import Base, StringID
from airflow.utils import timezone
+from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
@@ -72,6 +73,11 @@ class DagWarning(Base):
:return: None
"""
+ cls._purge_inactive_dag_warnings_with_retry(session)
+
+ @classmethod
+ @retry_db_transaction
+ def _purge_inactive_dag_warnings_with_retry(cls, session: Session) -> None:
from airflow.models.dag import DagModel
if session.get_bind().dialect.name == "sqlite":
diff --git a/tests/models/test_dagwarning.py b/tests/models/test_dagwarning.py
new file mode 100644
index 0000000000..7ae2962b1f
--- /dev/null
+++ b/tests/models/test_dagwarning.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+from sqlalchemy.exc import OperationalError
+
+from airflow.models import DagModel
+from airflow.models.dagwarning import DagWarning
+from tests.test_utils.db import clear_db_dags
+
+
+class TestDagWarning:
+ def setup_method(self):
+ clear_db_dags()
+
+ def test_purge_inactive_dag_warnings(self, session):
+ """
+ Test that the purge_inactive_dag_warnings method deletes inactive dag warnings
+ """
+
+ dags = [DagModel(dag_id="dag_1", is_active=False), DagModel(dag_id="dag_2", is_active=True)]
+ session.add_all(dags)
+ session.commit()
+
+ dag_warnings = [
+ DagWarning("dag_1", "non-existent pool", "non-existent pool"),
+ DagWarning("dag_2", "non-existent pool", "non-existent pool"),
+ ]
+ session.add_all(dag_warnings)
+ session.commit()
+
+ DagWarning.purge_inactive_dag_warnings(session)
+
+ remaining_dag_warnings = session.query(DagWarning).all()
+ assert len(remaining_dag_warnings) == 1
+ assert remaining_dag_warnings[0].dag_id == "dag_2"
+
+ def test_retry_purge_inactive_dag_warnings(self):
+ """
+ Test that the purge_inactive_dag_warnings method calls the delete method twice
+ if the query throws an operationalError on the first call and works on the second attempt
+ """
+ self.session_mock = MagicMock()
+ self.delete_mock = MagicMock()
+ self.session_mock.query.return_value.filter.return_value.delete = self.delete_mock
+
+ self.delete_mock.side_effect = [OperationalError(None, None, "database timeout"), None]
+
+ DagWarning.purge_inactive_dag_warnings(self.session_mock)
+
+ # Assert that the delete method was called twice
+ assert self.delete_mock.call_count == 2