You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/31 10:53:21 UTC

[airflow] branch master updated: Add metric for job start/end task run (#8680)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new a3fc8be  Add metric for job start/end task run (#8680)
a3fc8be is described below

commit a3fc8be34e4b8ea12f4994fe096eae4556eeef52
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Sun May 31 03:52:20 2020 -0700

    Add metric for job start/end task run (#8680)
    
    Co-authored-by: Ace Haidrey <ah...@pinterest.com>
---
 airflow/models/taskinstance.py    |  3 +++
 docs/metrics.rst                  |  2 ++
 tests/models/test_taskinstance.py | 25 ++++++++++++++++++++++++-
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2a89713..cdfb6c1 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -963,6 +963,7 @@ class TaskInstance(Base, LoggingMixin):
 
         context = {}  # type: Dict
         actual_start_date = timezone.utcnow()
+        Stats.incr('ti.start.{}.{}'.format(task.dag_id, task.task_id))
         try:
             if not mark_success:
                 context = self.get_template_context()
@@ -1084,6 +1085,8 @@ class TaskInstance(Base, LoggingMixin):
         except (Exception, KeyboardInterrupt) as e:
             self.handle_failure(e, test_mode, context)
             raise
+        finally:
+            Stats.incr('ti.finish.{}.{}.{}'.format(task.dag_id, task.task_id, self.state))
 
         # Success callback
         try:
diff --git a/docs/metrics.rst b/docs/metrics.rst
index 6a564a5..c31f308 100644
--- a/docs/metrics.rst
+++ b/docs/metrics.rst
@@ -86,6 +86,8 @@ Name                                    Description
 ``scheduler.tasks.running``             Number of tasks running in executor
 ``scheduler.tasks.starving``            Number of tasks that cannot be scheduled because of no open slot in pool
 ``sla_email_notification_failure``      Number of failed SLA miss email notification attempts
+``ti.start.<dagid>.<taskid>``           Number of started task in a given dag. Similar to <job_name>_start but for task
+``ti.finish.<dagid>.<taskid>.<state>``  Number of completed task in a given dag. Similar to <job_name>_end but for task
 ======================================= ================================================================
 
 Gauges
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 52feaa5..7209d37 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -22,7 +22,7 @@ import time
 import unittest
 import urllib
 from typing import List, Optional, Union
-from unittest.mock import mock_open, patch
+from unittest.mock import call, mock_open, patch
 
 import pendulum
 import pytest
@@ -40,6 +40,7 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python import PythonOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.sensors.python import PythonSensor
+from airflow.stats import Stats
 from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
 from airflow.ti_deps.dependencies_states import RUNNABLE_STATES
 from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
@@ -1599,6 +1600,28 @@ class TestTaskInstance(unittest.TestCase):
         ti.refresh_from_db()
         self.assertEqual(ti.state, State.SUCCESS)
 
+    @patch.object(Stats, 'incr')
+    def test_task_stats(self, stats_mock):
+        dag = DAG('test_task_start_end_stats', start_date=DEFAULT_DATE,
+                  end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+        op = DummyOperator(task_id='dummy_op', dag=dag)
+        dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+            external_trigger=False)
+        ti = TI(task=op, execution_date=DEFAULT_DATE)
+        ti.state = State.RUNNING
+        session = settings.Session()
+        session.merge(ti)
+        session.commit()
+        ti._run_raw_task()
+        ti.refresh_from_db()
+        stats_mock.assert_called_with('ti.finish.{}.{}.{}'.format(dag.dag_id, op.task_id, ti.state))
+        self.assertIn(call('ti.start.{}.{}'.format(dag.dag_id, op.task_id)), stats_mock.mock_calls)
+        self.assertEqual(stats_mock.call_count, 5)
+
     def test_generate_command_default_param(self):
         dag_id = 'test_generate_command_default_param'
         task_id = 'task'