You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/14 08:51:31 UTC

[airflow] branch main updated: fix: continue checking sla (#26968)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e350b6d44d fix: continue checking sla (#26968)
e350b6d44d is described below

commit e350b6d44d6d34a1759b9c2d925b7d27532436ab
Author: doiken <61...@users.noreply.github.com>
AuthorDate: Wed Dec 14 17:51:23 2022 +0900

    fix: continue checking sla (#26968)
    
    Co-authored-by: doiken <do...@users.noreply.github.com>
---
 airflow/dag_processing/processor.py    |  2 +-
 tests/dag_processing/test_processor.py | 43 ++++++++++++++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index fbb2bd298d..b763be81aa 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -427,7 +427,7 @@ class DagFileProcessor(LoggingMixin):
                 if next_info is None:
                     break
                 if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query:
-                    break
+                    continue
                 if next_info.logical_date + task.sla < ts:
 
                     sla_miss = SlaMiss(
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index c38b88d6e6..3601dd9a9a 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -249,6 +249,49 @@ class TestDagFileProcessor:
         # ti is successful thereby trying to insert a duplicate record.
         dag_file_processor.manage_slas(dag=dag, session=session)
 
+    @mock.patch("airflow.dag_processing.processor.Stats.incr")
+    def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla(
+        self, mock_stats_incr, dag_maker
+    ):
+        """
+        Test that the dag file processor continue checking subsequent task instances
+        even if the preceding task instance misses the sla ahead
+        """
+        session = settings.Session()
+
+        # Create a dag with a start of 3 days ago and sla of 1 day,
+        # so we have 2 missing slas
+        now = timezone.utcnow()
+        test_start_date = now - datetime.timedelta(days=3)
+        with dag_maker(
+            dag_id="test_sla_miss",
+            default_args={"start_date": test_start_date, "sla": datetime.timedelta(days=1)},
+        ) as dag:
+            task = EmptyOperator(task_id="dummy")
+
+        dag_maker.create_dagrun(execution_date=test_start_date, state=State.SUCCESS)
+
+        session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success"))
+        session.merge(
+            SlaMiss(task_id=task.task_id, dag_id=dag.dag_id, execution_date=now - datetime.timedelta(days=2))
+        )
+        session.flush()
+
+        dag_file_processor = DagFileProcessor(
+            dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
+        )
+        dag_file_processor.manage_slas(dag=dag, session=session)
+        sla_miss_count = (
+            session.query(SlaMiss)
+            .filter(
+                SlaMiss.dag_id == dag.dag_id,
+                SlaMiss.task_id == task.task_id,
+            )
+            .count()
+        )
+        assert sla_miss_count == 2
+        mock_stats_incr.assert_called_with("sla_missed")
+
     @mock.patch("airflow.dag_processing.processor.Stats.incr")
     def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag):
         """