You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/01/11 21:25:48 UTC

[airflow] 07/27: Add retry to purge_inactive_dag_warnings (#28481)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 825d9fcac867d994a11414f95e988ca3dac1b790
Author: Michael Petro <40...@users.noreply.github.com>
AuthorDate: Thu Jan 5 00:58:07 2023 -0500

    Add retry to purge_inactive_dag_warnings (#28481)
    
    Co-authored-by: Ephraim Anierobi <sp...@gmail.com>
    (cherry picked from commit 5289938ec1d9011a9ff8625705cffd1708f9274d)
---
 airflow/models/dagwarning.py    |  6 ++++
 tests/models/test_dagwarning.py | 69 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 75 insertions(+)

diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py
index fb77e42a9d..db93aafb0f 100644
--- a/airflow/models/dagwarning.py
+++ b/airflow/models/dagwarning.py
@@ -24,6 +24,7 @@ from sqlalchemy.orm import Session
 
 from airflow.models.base import Base, StringID
 from airflow.utils import timezone
+from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
@@ -72,6 +73,11 @@ class DagWarning(Base):
 
         :return: None
         """
+        cls._purge_inactive_dag_warnings_with_retry(session)
+
+    @classmethod
+    @retry_db_transaction
+    def _purge_inactive_dag_warnings_with_retry(cls, session: Session) -> None:
         from airflow.models.dag import DagModel
 
         if session.get_bind().dialect.name == "sqlite":
diff --git a/tests/models/test_dagwarning.py b/tests/models/test_dagwarning.py
new file mode 100644
index 0000000000..7ae2962b1f
--- /dev/null
+++ b/tests/models/test_dagwarning.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+from sqlalchemy.exc import OperationalError
+
+from airflow.models import DagModel
+from airflow.models.dagwarning import DagWarning
+from tests.test_utils.db import clear_db_dags
+
+
+class TestDagWarning:
+    def setup_method(self):
+        clear_db_dags()
+
+    def test_purge_inactive_dag_warnings(self, session):
+        """
+        Test that the purge_inactive_dag_warnings method deletes inactive dag warnings
+        """
+
+        dags = [DagModel(dag_id="dag_1", is_active=False), DagModel(dag_id="dag_2", is_active=True)]
+        session.add_all(dags)
+        session.commit()
+
+        dag_warnings = [
+            DagWarning("dag_1", "non-existent pool", "non-existent pool"),
+            DagWarning("dag_2", "non-existent pool", "non-existent pool"),
+        ]
+        session.add_all(dag_warnings)
+        session.commit()
+
+        DagWarning.purge_inactive_dag_warnings(session)
+
+        remaining_dag_warnings = session.query(DagWarning).all()
+        assert len(remaining_dag_warnings) == 1
+        assert remaining_dag_warnings[0].dag_id == "dag_2"
+
+    def test_retry_purge_inactive_dag_warnings(self):
+        """
+        Test that the purge_inactive_dag_warnings method calls the delete method twice
+        if the query throws an operationalError on the first call and works on the second attempt
+        """
+        self.session_mock = MagicMock()
+        self.delete_mock = MagicMock()
+        self.session_mock.query.return_value.filter.return_value.delete = self.delete_mock
+
+        self.delete_mock.side_effect = [OperationalError(None, None, "database timeout"), None]
+
+        DagWarning.purge_inactive_dag_warnings(self.session_mock)
+
+        # Assert that the delete method was called twice
+        assert self.delete_mock.call_count == 2