You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/07 18:57:05 UTC

[2/5] incubator-airflow git commit: [AIRFLOW-663] Improve time units for task performance charts

[AIRFLOW-663] Improve time units for task performance charts

The task duration and landing time charts display
time interval values in hours. This is not the
appropriate unit for tasks that execute on smaller
time scales (~minutes, ~seconds), and the chart
is unreadable in those cases.

This patch converts the time values to the
appropriate units and updates the y axis label to
show the unit of analysis.

Closes #1914 from vijaysbhat/chart_time_units


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6c5109bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6c5109bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6c5109bd

Branch: refs/heads/v1-8-test
Commit: 6c5109bdcd8a81eeb5654fe109c0d111f59e0d6b
Parents: 9a7801d
Author: Vijay Bhat <vi...@gmail.com>
Authored: Sat Jan 7 18:24:38 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 7 18:24:43 2017 +0100

----------------------------------------------------------------------
 airflow/utils/dates.py | 33 ++++++++++++++++++++++++
 airflow/www/views.py   | 61 ++++++++++++++++++++++++++++++++-------------
 tests/core.py          | 30 +++++++++++++++++++++-
 3 files changed, 106 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c5109bd/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index cd9aab5..84fd791 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -179,3 +179,36 @@ def round_time(dt, delta, start_date=datetime.min):
     # in the special case when start_date > dt the search for upper will
     # immediately stop for upper == 1 which results in lower = upper // 2 = 0
     # and this function returns start_date.
+
+
+def infer_time_unit(time_seconds_arr):
+    """
+    Determine the most appropriate time unit for an array of time durations
+    specified in seconds.
+
+    e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
+    """
+    if len(time_seconds_arr) == 0:
+        return 'hours'
+    max_time_seconds = max(time_seconds_arr)
+    if max_time_seconds <= 60*2:
+        return 'seconds'
+    elif max_time_seconds <= 60*60*2:
+        return 'minutes'
+    elif max_time_seconds <= 24*60*60*2:
+        return 'hours'
+    else:
+        return 'days'
+
+
+def scale_time_units(time_seconds_arr, unit):
+    """
+    Convert an array of time durations in seconds to the specified time unit.
+    """
+    if unit == 'minutes':
+        return list(map(lambda x: x*1.0/60, time_seconds_arr))
+    elif unit == 'hours':
+        return list(map(lambda x: x*1.0/(60*60), time_seconds_arr))
+    elif unit == 'days':
+        return list(map(lambda x: x*1.0/(24*60*60), time_seconds_arr))
+    return time_seconds_arr

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c5109bd/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7134264..5de6855 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 #
 
-from past.utils import old_div
 from past.builtins import basestring, unicode
 
 import os
@@ -73,6 +72,7 @@ from airflow.utils.state import State
 from airflow.utils.db import provide_session
 from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils import logging as log_utils
+from airflow.utils.dates import infer_time_unit, scale_time_units
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.configuration import AirflowConfigException
@@ -1468,25 +1468,41 @@ class Airflow(BaseView):
         cum_chart = nvd3.lineChart(
             name="cumLineChart", x_is_date=True, height=600, width="1200")
 
+        y = {}
+        x = {}
+        cum_y = {}
         for task in dag.tasks:
-            y = []
-            x = []
-            cum_y = []
+            y[task.task_id] = []
+            x[task.task_id] = []
+            cum_y[task.task_id] = []
             for ti in task.get_task_instances(session, start_date=min_date,
                                               end_date=base_date):
                 if ti.duration:
                     dttm = wwwutils.epoch(ti.execution_date)
-                    x.append(dttm)
-                    y.append(float(ti.duration) / (60*60))
+                    x[ti.task_id].append(dttm)
+                    y[ti.task_id].append(float(ti.duration))
                     fails = session.query(models.TaskFail).filter_by(
                         task_id=ti.task_id,
                         dag_id=ti.dag_id,
                         execution_date=ti.execution_date).all()
                     fails_total = sum([f.duration for f in fails])
-                    cum_y.append(float(ti.duration + fails_total) / (60*60))
-            if x:
-                chart.add_serie(name=task.task_id, x=x, y=y)
-                cum_chart.add_serie(name=task.task_id, x=x, y=cum_y)
+                    cum_y[ti.task_id].append(float(ti.duration + fails_total))
+        # determine the most relevant time unit for the set of task instance
+        # durations for the DAG
+        y_unit = infer_time_unit([d for t in y.values() for d in t])
+        cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t])
+        # update the y Axis on both charts to have the correct time units
+        chart.create_y_axis('yAxis', format='.02f', custom_format=False,
+                            label='Duration ({})'.format(y_unit))
+        cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False,
+                                label='Duration ({})'.format(cum_y_unit))
+        for task in dag.tasks:
+            if x[task.task_id]:
+                chart.add_serie(name=task.task_id, x=x[task.task_id],
+                                y=scale_time_units(y[task.task_id], y_unit))
+                cum_chart.add_serie(name=task.task_id, x=x[task.task_id],
+                                    y=scale_time_units(cum_y[task.task_id],
+                                    cum_y_unit))
 
         tis = dag.get_task_instances(
             session, start_date=min_date, end_date=base_date)
@@ -1607,9 +1623,11 @@ class Airflow(BaseView):
 
         chart = nvd3.lineChart(
             name="lineChart", x_is_date=True, height=600, width="1200")
+        y = {}
+        x = {}
         for task in dag.tasks:
-            y = []
-            x = []
+            y[task.task_id] = []
+            x[task.task_id] = []
             for ti in task.get_task_instances(session, start_date=min_date,
                                               end_date=base_date):
                 ts = ti.execution_date
@@ -1617,11 +1635,20 @@ class Airflow(BaseView):
                     ts = dag.following_schedule(ts)
                 if ti.end_date:
                     dttm = wwwutils.epoch(ti.execution_date)
-                    secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
-                    x.append(dttm)
-                    y.append(secs)
-            if x:
-                chart.add_serie(name=task.task_id, x=x, y=y)
+                    secs = (ti.end_date - ts).total_seconds()
+                    x[ti.task_id].append(dttm)
+                    y[ti.task_id].append(secs)
+
+        # determine the most relevant time unit for the set of landing times
+        # for the DAG
+        y_unit = infer_time_unit([d for t in y.values() for d in t])
+        # update the y Axis to have the correct time units
+        chart.create_y_axis('yAxis', format='.02f', custom_format=False,
+                            label='Landing Time ({})'.format(y_unit))
+        for task in dag.tasks:
+            if x[task.task_id]:
+                chart.add_serie(name=task.task_id, x=x[task.task_id],
+                                y=scale_time_units(y[task.task_id], y_unit))
 
         tis = dag.get_task_instances(
                 session, start_date=min_date, end_date=base_date)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c5109bd/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index bcc7840..6fb735f 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -21,6 +21,7 @@ import re
 import unittest
 import multiprocessing
 import mock
+from numpy.testing import assert_array_almost_equal
 import tempfile
 from datetime import datetime, time, timedelta
 from email.mime.multipart import MIMEMultipart
@@ -54,7 +55,7 @@ from airflow.bin import cli
 from airflow.www import app as application
 from airflow.settings import Session
 from airflow.utils.state import State
-from airflow.utils.dates import round_time
+from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
 from airflow.utils.logging import LoggingMixin
 from lxml import html
 from airflow.exceptions import AirflowException
@@ -810,6 +811,33 @@ class CoreTest(unittest.TestCase):
             2015, 9, 14, 0, 0))
         assert rt6 == datetime(2015, 9, 14, 0, 0)
 
+    def test_infer_time_unit(self):
+
+        assert infer_time_unit([130, 5400, 10]) == 'minutes'
+
+        assert infer_time_unit([110, 50, 10, 100]) == 'seconds'
+
+        assert infer_time_unit([100000, 50000, 10000, 20000]) == 'hours'
+
+        assert infer_time_unit([200000, 100000]) == 'days'
+
+    def test_scale_time_units(self):
+
+        # use assert_almost_equal from numpy.testing since we are comparing
+        # floating point arrays
+        arr1 = scale_time_units([130, 5400, 10], 'minutes')
+        assert_array_almost_equal(arr1, [2.167, 90.0, 0.167], decimal=3)
+
+        arr2 = scale_time_units([110, 50, 10, 100], 'seconds')
+        assert_array_almost_equal(arr2, [110.0, 50.0, 10.0, 100.0], decimal=3)
+
+        arr3 = scale_time_units([100000, 50000, 10000, 20000], 'hours')
+        assert_array_almost_equal(arr3, [27.778, 13.889, 2.778, 5.556],
+                                  decimal=3)
+
+        arr4 = scale_time_units([200000, 100000], 'days')
+        assert_array_almost_equal(arr4, [2.315, 1.157], decimal=3)
+
     def test_duplicate_dependencies(self):
 
         regexp = "Dependency (.*)runme_0(.*)run_after_loop(.*) " \