You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/14 08:51:31 UTC
[airflow] branch main updated: fix: continue checking sla (#26968)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e350b6d44d fix: continue checking sla (#26968)
e350b6d44d is described below
commit e350b6d44d6d34a1759b9c2d925b7d27532436ab
Author: doiken <61...@users.noreply.github.com>
AuthorDate: Wed Dec 14 17:51:23 2022 +0900
fix: continue checking sla (#26968)
Co-authored-by: doiken <do...@users.noreply.github.com>
---
airflow/dag_processing/processor.py | 2 +-
tests/dag_processing/test_processor.py | 43 ++++++++++++++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index fbb2bd298d..b763be81aa 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -427,7 +427,7 @@ class DagFileProcessor(LoggingMixin):
if next_info is None:
break
if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query:
- break
+ continue
if next_info.logical_date + task.sla < ts:
sla_miss = SlaMiss(
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index c38b88d6e6..3601dd9a9a 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -249,6 +249,49 @@ class TestDagFileProcessor:
# ti is successful thereby trying to insert a duplicate record.
dag_file_processor.manage_slas(dag=dag, session=session)
+ @mock.patch("airflow.dag_processing.processor.Stats.incr")
+ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla(
+ self, mock_stats_incr, dag_maker
+ ):
+ """
+ Test that the dag file processor continue checking subsequent task instances
+ even if the preceding task instance misses the sla ahead
+ """
+ session = settings.Session()
+
+ # Create a dag with a start of 3 days ago and sla of 1 day,
+ # so we have 2 missing slas
+ now = timezone.utcnow()
+ test_start_date = now - datetime.timedelta(days=3)
+ with dag_maker(
+ dag_id="test_sla_miss",
+ default_args={"start_date": test_start_date, "sla": datetime.timedelta(days=1)},
+ ) as dag:
+ task = EmptyOperator(task_id="dummy")
+
+ dag_maker.create_dagrun(execution_date=test_start_date, state=State.SUCCESS)
+
+ session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success"))
+ session.merge(
+ SlaMiss(task_id=task.task_id, dag_id=dag.dag_id, execution_date=now - datetime.timedelta(days=2))
+ )
+ session.flush()
+
+ dag_file_processor = DagFileProcessor(
+ dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
+ )
+ dag_file_processor.manage_slas(dag=dag, session=session)
+ sla_miss_count = (
+ session.query(SlaMiss)
+ .filter(
+ SlaMiss.dag_id == dag.dag_id,
+ SlaMiss.task_id == task.task_id,
+ )
+ .count()
+ )
+ assert sla_miss_count == 2
+ mock_stats_incr.assert_called_with("sla_missed")
+
@mock.patch("airflow.dag_processing.processor.Stats.incr")
def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag):
"""