You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/31 10:53:21 UTC
[airflow] branch master updated: Add metric for job start/end task
run (#8680)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a3fc8be Add metric for job start/end task run (#8680)
a3fc8be is described below
commit a3fc8be34e4b8ea12f4994fe096eae4556eeef52
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Sun May 31 03:52:20 2020 -0700
Add metric for job start/end task run (#8680)
Co-authored-by: Ace Haidrey <ah...@pinterest.com>
---
airflow/models/taskinstance.py | 3 +++
docs/metrics.rst | 2 ++
tests/models/test_taskinstance.py | 25 ++++++++++++++++++++++++-
3 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2a89713..cdfb6c1 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -963,6 +963,7 @@ class TaskInstance(Base, LoggingMixin):
context = {} # type: Dict
actual_start_date = timezone.utcnow()
+ Stats.incr('ti.start.{}.{}'.format(task.dag_id, task.task_id))
try:
if not mark_success:
context = self.get_template_context()
@@ -1084,6 +1085,8 @@ class TaskInstance(Base, LoggingMixin):
except (Exception, KeyboardInterrupt) as e:
self.handle_failure(e, test_mode, context)
raise
+ finally:
+ Stats.incr('ti.finish.{}.{}.{}'.format(task.dag_id, task.task_id, self.state))
# Success callback
try:
diff --git a/docs/metrics.rst b/docs/metrics.rst
index 6a564a5..c31f308 100644
--- a/docs/metrics.rst
+++ b/docs/metrics.rst
@@ -86,6 +86,8 @@ Name Description
``scheduler.tasks.running`` Number of tasks running in executor
``scheduler.tasks.starving`` Number of tasks that cannot be scheduled because of no open slot in pool
``sla_email_notification_failure`` Number of failed SLA miss email notification attempts
+``ti.start.<dagid>.<taskid>`` Number of started task in a given dag. Similar to <job_name>_start but for task
+``ti.finish.<dagid>.<taskid>.<state>`` Number of completed task in a given dag. Similar to <job_name>_end but for task
======================================= ================================================================
Gauges
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 52feaa5..7209d37 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -22,7 +22,7 @@ import time
import unittest
import urllib
from typing import List, Optional, Union
-from unittest.mock import mock_open, patch
+from unittest.mock import call, mock_open, patch
import pendulum
import pytest
@@ -40,6 +40,7 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.sensors.python import PythonSensor
+from airflow.stats import Stats
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
from airflow.ti_deps.dependencies_states import RUNNABLE_STATES
from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
@@ -1599,6 +1600,28 @@ class TestTaskInstance(unittest.TestCase):
ti.refresh_from_db()
self.assertEqual(ti.state, State.SUCCESS)
+ @patch.object(Stats, 'incr')
+ def test_task_stats(self, stats_mock):
+ dag = DAG('test_task_start_end_stats', start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+ op = DummyOperator(task_id='dummy_op', dag=dag)
+ dag.create_dagrun(
+ run_id='manual__' + DEFAULT_DATE.isoformat(),
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ external_trigger=False)
+ ti = TI(task=op, execution_date=DEFAULT_DATE)
+ ti.state = State.RUNNING
+ session = settings.Session()
+ session.merge(ti)
+ session.commit()
+ ti._run_raw_task()
+ ti.refresh_from_db()
+ stats_mock.assert_called_with('ti.finish.{}.{}.{}'.format(dag.dag_id, op.task_id, ti.state))
+ self.assertIn(call('ti.start.{}.{}'.format(dag.dag_id, op.task_id)), stats_mock.mock_calls)
+ self.assertEqual(stats_mock.call_count, 5)
+
def test_generate_command_default_param(self):
dag_id = 'test_generate_command_default_param'
task_id = 'task'