You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/07 18:57:05 UTC
[2/5] incubator-airflow git commit: [AIRFLOW-663] Improve time units
for task performance charts
[AIRFLOW-663] Improve time units for task performance charts
The task duration and landing time charts display
time interval values in hours. This is not the
appropriate unit for tasks that execute on smaller
time scales (~minutes, ~seconds), and the chart
is unreadable in those cases.
This patch converts the time values to the
appropriate units and updates the y axis label to
show the unit of analysis.
Closes #1914 from vijaysbhat/chart_time_units
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6c5109bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6c5109bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6c5109bd
Branch: refs/heads/v1-8-test
Commit: 6c5109bdcd8a81eeb5654fe109c0d111f59e0d6b
Parents: 9a7801d
Author: Vijay Bhat <vi...@gmail.com>
Authored: Sat Jan 7 18:24:38 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 7 18:24:43 2017 +0100
----------------------------------------------------------------------
airflow/utils/dates.py | 33 ++++++++++++++++++++++++
airflow/www/views.py | 61 ++++++++++++++++++++++++++++++++-------------
tests/core.py | 30 +++++++++++++++++++++-
3 files changed, 106 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c5109bd/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index cd9aab5..84fd791 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -179,3 +179,36 @@ def round_time(dt, delta, start_date=datetime.min):
# in the special case when start_date > dt the search for upper will
# immediately stop for upper == 1 which results in lower = upper // 2 = 0
# and this function returns start_date.
+
+
+def infer_time_unit(time_seconds_arr):
+ """
+ Determine the most appropriate time unit for an array of time durations
+ specified in seconds.
+
+ e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
+ """
+ if len(time_seconds_arr) == 0:
+ return 'hours'
+ max_time_seconds = max(time_seconds_arr)
+ if max_time_seconds <= 60*2:
+ return 'seconds'
+ elif max_time_seconds <= 60*60*2:
+ return 'minutes'
+ elif max_time_seconds <= 24*60*60*2:
+ return 'hours'
+ else:
+ return 'days'
+
+
+def scale_time_units(time_seconds_arr, unit):
+ """
+ Convert an array of time durations in seconds to the specified time unit.
+ """
+ if unit == 'minutes':
+ return list(map(lambda x: x*1.0/60, time_seconds_arr))
+ elif unit == 'hours':
+ return list(map(lambda x: x*1.0/(60*60), time_seconds_arr))
+ elif unit == 'days':
+ return list(map(lambda x: x*1.0/(24*60*60), time_seconds_arr))
+ return time_seconds_arr
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c5109bd/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7134264..5de6855 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -13,7 +13,6 @@
# limitations under the License.
#
-from past.utils import old_div
from past.builtins import basestring, unicode
import os
@@ -73,6 +72,7 @@ from airflow.utils.state import State
from airflow.utils.db import provide_session
from airflow.utils.helpers import alchemy_to_dict
from airflow.utils import logging as log_utils
+from airflow.utils.dates import infer_time_unit, scale_time_units
from airflow.www import utils as wwwutils
from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
from airflow.configuration import AirflowConfigException
@@ -1468,25 +1468,41 @@ class Airflow(BaseView):
cum_chart = nvd3.lineChart(
name="cumLineChart", x_is_date=True, height=600, width="1200")
+ y = {}
+ x = {}
+ cum_y = {}
for task in dag.tasks:
- y = []
- x = []
- cum_y = []
+ y[task.task_id] = []
+ x[task.task_id] = []
+ cum_y[task.task_id] = []
for ti in task.get_task_instances(session, start_date=min_date,
end_date=base_date):
if ti.duration:
dttm = wwwutils.epoch(ti.execution_date)
- x.append(dttm)
- y.append(float(ti.duration) / (60*60))
+ x[ti.task_id].append(dttm)
+ y[ti.task_id].append(float(ti.duration))
fails = session.query(models.TaskFail).filter_by(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=ti.execution_date).all()
fails_total = sum([f.duration for f in fails])
- cum_y.append(float(ti.duration + fails_total) / (60*60))
- if x:
- chart.add_serie(name=task.task_id, x=x, y=y)
- cum_chart.add_serie(name=task.task_id, x=x, y=cum_y)
+ cum_y[ti.task_id].append(float(ti.duration + fails_total))
+ # determine the most relevant time unit for the set of task instance
+ # durations for the DAG
+ y_unit = infer_time_unit([d for t in y.values() for d in t])
+ cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t])
+ # update the y Axis on both charts to have the correct time units
+ chart.create_y_axis('yAxis', format='.02f', custom_format=False,
+ label='Duration ({})'.format(y_unit))
+ cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False,
+ label='Duration ({})'.format(cum_y_unit))
+ for task in dag.tasks:
+ if x[task.task_id]:
+ chart.add_serie(name=task.task_id, x=x[task.task_id],
+ y=scale_time_units(y[task.task_id], y_unit))
+ cum_chart.add_serie(name=task.task_id, x=x[task.task_id],
+ y=scale_time_units(cum_y[task.task_id],
+ cum_y_unit))
tis = dag.get_task_instances(
session, start_date=min_date, end_date=base_date)
@@ -1607,9 +1623,11 @@ class Airflow(BaseView):
chart = nvd3.lineChart(
name="lineChart", x_is_date=True, height=600, width="1200")
+ y = {}
+ x = {}
for task in dag.tasks:
- y = []
- x = []
+ y[task.task_id] = []
+ x[task.task_id] = []
for ti in task.get_task_instances(session, start_date=min_date,
end_date=base_date):
ts = ti.execution_date
@@ -1617,11 +1635,20 @@ class Airflow(BaseView):
ts = dag.following_schedule(ts)
if ti.end_date:
dttm = wwwutils.epoch(ti.execution_date)
- secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
- x.append(dttm)
- y.append(secs)
- if x:
- chart.add_serie(name=task.task_id, x=x, y=y)
+ secs = (ti.end_date - ts).total_seconds()
+ x[ti.task_id].append(dttm)
+ y[ti.task_id].append(secs)
+
+ # determine the most relevant time unit for the set of landing times
+ # for the DAG
+ y_unit = infer_time_unit([d for t in y.values() for d in t])
+ # update the y Axis to have the correct time units
+ chart.create_y_axis('yAxis', format='.02f', custom_format=False,
+ label='Landing Time ({})'.format(y_unit))
+ for task in dag.tasks:
+ if x[task.task_id]:
+ chart.add_serie(name=task.task_id, x=x[task.task_id],
+ y=scale_time_units(y[task.task_id], y_unit))
tis = dag.get_task_instances(
session, start_date=min_date, end_date=base_date)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c5109bd/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index bcc7840..6fb735f 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -21,6 +21,7 @@ import re
import unittest
import multiprocessing
import mock
+from numpy.testing import assert_array_almost_equal
import tempfile
from datetime import datetime, time, timedelta
from email.mime.multipart import MIMEMultipart
@@ -54,7 +55,7 @@ from airflow.bin import cli
from airflow.www import app as application
from airflow.settings import Session
from airflow.utils.state import State
-from airflow.utils.dates import round_time
+from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
from airflow.utils.logging import LoggingMixin
from lxml import html
from airflow.exceptions import AirflowException
@@ -810,6 +811,33 @@ class CoreTest(unittest.TestCase):
2015, 9, 14, 0, 0))
assert rt6 == datetime(2015, 9, 14, 0, 0)
+ def test_infer_time_unit(self):
+
+ assert infer_time_unit([130, 5400, 10]) == 'minutes'
+
+ assert infer_time_unit([110, 50, 10, 100]) == 'seconds'
+
+ assert infer_time_unit([100000, 50000, 10000, 20000]) == 'hours'
+
+ assert infer_time_unit([200000, 100000]) == 'days'
+
+ def test_scale_time_units(self):
+
+ # use assert_almost_equal from numpy.testing since we are comparing
+ # floating point arrays
+ arr1 = scale_time_units([130, 5400, 10], 'minutes')
+ assert_array_almost_equal(arr1, [2.167, 90.0, 0.167], decimal=3)
+
+ arr2 = scale_time_units([110, 50, 10, 100], 'seconds')
+ assert_array_almost_equal(arr2, [110.0, 50.0, 10.0, 100.0], decimal=3)
+
+ arr3 = scale_time_units([100000, 50000, 10000, 20000], 'hours')
+ assert_array_almost_equal(arr3, [27.778, 13.889, 2.778, 5.556],
+ decimal=3)
+
+ arr4 = scale_time_units([200000, 100000], 'days')
+ assert_array_almost_equal(arr4, [2.315, 1.157], decimal=3)
+
def test_duplicate_dependencies(self):
regexp = "Dependency (.*)runme_0(.*)run_after_loop(.*) " \