You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/04/22 12:05:34 UTC

[airflow] branch main updated: Initialize finished counter at zero (#23080)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b2ef88f87 Initialize finished counter at zero (#23080)
3b2ef88f87 is described below

commit 3b2ef88f877fc5e4fcbe8038f0a9251263eaafbc
Author: Bill Franklin <bi...@users.noreply.github.com>
AuthorDate: Fri Apr 22 13:05:13 2022 +0100

    Initialize finished counter at zero (#23080)
    
    Sets initial count of task finished state to zero.
    This enables acquiring the rate from zero to one
    (particularly useful if you want to alert on any failures).
    
    We're using the Prometheus statsd-exporter. Since counters
    are usually used with a PromQL function like `rate`, it's important
    that counters are initialized at zero, otherwise when a task
    finishes the rate function will not have a previous value to compare
    the state count to.
    
    For example, what we'd like to do:
    
    ```
    sum by (dag_id, task_id) (rate(airflow_ti_finish{state='failed'}[1h])) >
    0
    ```
    
    This tells us the failure rate of tasks over time.
    
    What I've tried to do instead to ensure the metric captures the change
    from zero to one:
    
    ```
    (sum by (dag_id, task_id) (rate(airflow_ti_finish{state='failed'}[1h])) > 0) or sum by (dag_id, task_id) (airflow_ti_finish{state='failed'} != 0 unless (airflow_ti_finish{state='failed'} offset 1m))
    ```
    
    Two useful posts on this subject:
    https://www.robustperception.io/why-predeclare-metrics
    https://www.section.io/blog/beware-prometheus-counters-that-do-not-begin-at-zero/
    
    Co-authored-by: Bill Franklin <b....@mwam.com>
---
 airflow/models/taskinstance.py    | 3 +++
 tests/models/test_taskinstance.py | 4 +++-
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 73f38e1a9d..412b06cd29 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1431,6 +1431,9 @@ class TaskInstance(Base, LoggingMixin):
             session.commit()
         actual_start_date = timezone.utcnow()
         Stats.incr(f'ti.start.{self.task.dag_id}.{self.task.task_id}')
+        # Initialize final state counters at zero
+        for state in State.task_states:
+            Stats.incr(f'ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}', count=0)
         try:
             if not mark_success:
                 self.task = self.task.prepare_for_execution()
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 2c52fd0eb6..d104e79835 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2020,8 +2020,10 @@ class TestTaskInstance:
         ti._run_raw_task()
         ti.refresh_from_db()
         stats_mock.assert_called_with(f'ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}')
+        for state in State.task_states:
+            assert call(f'ti.finish.{ti.dag_id}.{ti.task_id}.{state}', count=0) in stats_mock.mock_calls
         assert call(f'ti.start.{ti.dag_id}.{ti.task_id}') in stats_mock.mock_calls
-        assert stats_mock.call_count == 4
+        assert stats_mock.call_count == 19
 
     def test_command_as_list(self, create_task_instance):
         ti = create_task_instance()