You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/22 23:13:18 UTC

[incubator-sdap-nexus] branch master updated: SDAP-232: Add performance metrics logging. (#98)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c3d5d8  SDAP-232: Add performance metrics logging. (#98)
5c3d5d8 is described below

commit 5c3d5d82b9817499ce2af9161402272f7d3049fd
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Jun 22 16:13:08 2020 -0700

    SDAP-232: Add performance metrics logging. (#98)
    
    Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
 analysis/setup.py                                  |  3 +-
 analysis/webservice/NexusHandler.py                | 39 +++++++--
 .../webservice/algorithms_spark/HofMoellerSpark.py | 42 ++++++++--
 .../webservice/algorithms_spark/TimeAvgMapSpark.py | 93 +++++++++++++--------
 .../webservice/algorithms_spark/TimeSeriesSpark.py | 42 ++++++----
 analysis/webservice/metrics/MetricsField.py        | 34 ++++++++
 analysis/webservice/metrics/MetricsRecord.py       | 32 ++++++++
 analysis/webservice/metrics/__init__.py            |  2 +
 data-access/nexustiles/nexustiles.py               | 94 +++++++++++++++-------
 docker/nexus-webapp/Dockerfile                     |  8 +-
 10 files changed, 292 insertions(+), 97 deletions(-)

diff --git a/analysis/setup.py b/analysis/setup.py
index 57089fc..d6d6803 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -40,7 +40,8 @@ setuptools.setup(
         'webservice',
         'webservice.algorithms',
         'webservice.algorithms.doms',
-        'webservice.algorithms_spark'
+        'webservice.algorithms_spark',
+        'webservice.metrics'
     ],
     package_data={
         'webservice': ['config/web.ini', 'config/algorithms.ini'],
diff --git a/analysis/webservice/NexusHandler.py b/analysis/webservice/NexusHandler.py
index fd3cb4b..87d2af7 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -21,7 +21,8 @@ import types
 import numpy as np
 from netCDF4 import Dataset
 from nexustiles.nexustiles import NexusTileService
-
+from webservice.metrics.MetricsField import NumberMetricsField, SparkAccumulatorMetricsField
+from webservice.metrics.MetricsRecord import MetricsRecord
 from webservice.webmodel import NexusProcessingException
 
 AVAILABLE_HANDLERS = []
@@ -329,10 +330,10 @@ class SparkHandler(NexusHandler):
         self._endYear = end_year
         self._climMonth = clim_month
         self._fill = fill
-        
+
     def _set_info_from_tile_set(self, nexus_tiles):
         ntiles = len(nexus_tiles)
-        self.log.debug('Attempting to extract info from {0} tiles'.\
+        self.log.debug('Attempting to extract info from {0} tiles'. \
                        format(ntiles))
         status = False
         self._latRes = None
@@ -350,7 +351,7 @@ class SparkHandler(NexusHandler):
                 if (len(lons) > 1):
                     self._lonRes = abs(lons[1] - lons[0])
             if ((self._latRes is not None) and
-                (self._lonRes is not None)):
+                    (self._lonRes is not None)):
                 lats_agg = np.concatenate([tile.latitudes.compressed()
                                            for tile in nexus_tiles])
                 lons_agg = np.concatenate([tile.longitudes.compressed()
@@ -367,7 +368,7 @@ class SparkHandler(NexusHandler):
                 break
         return status
 
-    def _find_global_tile_set(self):
+    def _find_global_tile_set(self, metrics_callback=None):
         # This only works for a single dataset.  If more than one is provided,
         # we use the first one and ignore the rest.
         if type(self._ds) in (list, tuple):
@@ -382,7 +383,8 @@ class SparkHandler(NexusHandler):
                                                                self._maxLon,
                                                                ds,
                                                                self._startTime,
-                                                               self._endTime)
+                                                               self._endTime,
+                                                               metrics_callback=metrics_callback)
 
         # Empty tile set will be returned upon failure to find the global
         # tile set.
@@ -391,7 +393,9 @@ class SparkHandler(NexusHandler):
         # Check one time stamp at a time and attempt to extract the global
         # tile set.
         for t in t_in_range:
-            nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=ds, start_time=t, end_time=t)
+            nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon,
+                                                                      self._maxLon, ds=ds, start_time=t, end_time=t,
+                                                                      metrics_callback=metrics_callback)
             if self._set_info_from_tile_set(nexus_tiles):
                 # Successfully retrieved global tile set from nexus_tiles,
                 # so no need to check any other time stamps.
@@ -581,6 +585,27 @@ class SparkHandler(NexusHandler):
                              max_parallelism)
         return num_partitions
 
+    def _create_metrics_record(self):
+        return MetricsRecord([
+            SparkAccumulatorMetricsField(key='num_tiles',
+                                         description='Number of tiles fetched',
+                                         accumulator=self._sc.accumulator(0)),
+            SparkAccumulatorMetricsField(key='partitions',
+                                         description='Number of Spark partitions',
+                                         accumulator=self._sc.accumulator(0)),
+            SparkAccumulatorMetricsField(key='cassandra',
+                                         description='Cumulative time to fetch data from Cassandra',
+                                         accumulator=self._sc.accumulator(0)),
+            SparkAccumulatorMetricsField(key='solr',
+                                         description='Cumulative time to fetch data from Solr',
+                                         accumulator=self._sc.accumulator(0)),
+            SparkAccumulatorMetricsField(key='calculation',
+                                         description='Cumulative time to do calculations',
+                                         accumulator=self._sc.accumulator(0)),
+            NumberMetricsField(key='reduce', description='Actual time to reduce results'),
+            NumberMetricsField(key="actual_time", description="Total (actual) time")
+        ])
+
 
 def executeInitializers(config):
     [wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 1696732..5fd01ba 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -17,6 +17,7 @@ import itertools
 import logging
 from cStringIO import StringIO
 from datetime import datetime
+from functools import partial
 
 import matplotlib.pyplot as plt
 import mpld3
@@ -26,7 +27,6 @@ from matplotlib import cm
 from matplotlib.ticker import FuncFormatter
 from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
-
 from webservice.NexusHandler import SparkHandler, nexus_handler
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
 
@@ -40,7 +40,7 @@ LONGITUDE = 1
 
 class HofMoellerCalculator(object):
     @staticmethod
-    def hofmoeller_stats(tile_in_spark):
+    def hofmoeller_stats(metrics_callback, tile_in_spark):
 
         (latlon, tile_id, index,
          min_lat, max_lat, min_lon, max_lon) = tile_in_spark
@@ -48,7 +48,8 @@ class HofMoellerCalculator(object):
         tile_service = NexusTileService()
         try:
             # Load the dataset tile
-            tile = tile_service.find_tile_by_id(tile_id)[0]
+            tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0]
+            calculation_start = datetime.now()
             # Mask it to the search domain
             tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
                                                    min_lon, max_lon, [tile])[0]
@@ -93,6 +94,9 @@ class HofMoellerCalculator(object):
                                               np.max(vals).item(),
                                               np.min(vals).item(),
                                               np.var(vals).item())))
+        calculation_duration = (datetime.now() - calculation_start).total_seconds()
+        metrics_callback(calculation=calculation_duration)
+
         return stats
 
 
@@ -261,7 +265,8 @@ def hof_tuple_to_dict(t, avg_var_name):
             'max': t[6],
             'min': t[7]}
 
-def spark_driver(sc, latlon, nexus_tiles_spark):
+
+def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback):
     # Parallelize list of tile ids
     rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
     if latlon == 0:
@@ -276,10 +281,12 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
     # Create a set of key-value pairs where the key is (time, lat|lon) and
     # the value is a tuple of intermediate statistics for the specified
     # coordinate within a single NEXUS tile.
-    results = rdd.flatMap(HofMoellerCalculator.hofmoeller_stats)
+    metrics_callback(partitions=rdd.getNumPartitions())
+    results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, metrics_callback))
 
     # Combine tuples across tiles with input key = (time, lat|lon)
     # Output a key value pair with key = (time)
+    reduce_start = datetime.now()
     results = results.combineByKey(lambda val: (hof_tuple_time(val), val),
                                    lambda x, val: (hof_tuple_time(x),
                                                    hof_tuple_combine(x[1],
@@ -317,6 +324,9 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
         values(). \
         collect()
 
+    reduce_duration = (datetime.now() - reduce_start).total_seconds()
+    metrics_callback(reduce=reduce_duration)
+
     return results
 
 
@@ -335,18 +345,22 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     def calc(self, compute_options, **args):
         ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
 
+        metrics_record = self._create_metrics_record()
+        calculation_start = datetime.now()
+
         min_lon, min_lat, max_lon, max_lat = bbox.bounds
 
         nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
                              enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
                                                                             ds, start_time, end_time,
+                                                                            metrics_callback=metrics_record.record_metrics,
                                                                             fetch_data=False))]
 
         print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
             raise NoDataException(reason="No data found for selected timeframe")
 
-        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
+        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry['time'])
         for i in range(len(results)):
@@ -359,6 +373,11 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
         result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LATITUDE,
                                     minLat=min_lat, maxLat=max_lat, minLon=min_lon,
                                     maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
+
+        duration = (datetime.now() - calculation_start).total_seconds()
+        metrics_record.record_metrics(actual_time=duration)
+        metrics_record.print_metrics(self.log)
+
         return result
 
 
@@ -377,18 +396,22 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     def calc(self, compute_options, **args):
         ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
 
+        metrics_record = self._create_metrics_record()
+        calculation_start = datetime.now()
+
         min_lon, min_lat, max_lon, max_lat = bbox.bounds
 
         nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
                              enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
                                                                             ds, start_time, end_time,
+                                                                            metrics_callback=metrics_record.record_metrics,
                                                                             fetch_data=False))]
 
         print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
             raise NoDataException(reason="No data found for selected timeframe")
 
-        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
+        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
@@ -402,6 +425,11 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
         result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LONGITUDE,
                                     minLat=min_lat, maxLat=max_lat, minLon=min_lon,
                                     maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
+
+        duration = (datetime.now() - calculation_start).total_seconds()
+        metrics_record.record_metrics(actual_time=duration)
+        metrics_record.print_metrics(self.log)
+
         return result
 
 
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 9b00489..b1d11d4 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -13,16 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-import math
 import logging
 from datetime import datetime
+from functools import partial
 
 import numpy as np
 import shapely.geometry
 from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
-
 from webservice.NexusHandler import nexus_handler, SparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
 
@@ -143,6 +141,9 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         :param args: dict
         :return:
         """
+        request_start_time = datetime.now()
+
+        metrics_record = self._create_metrics_record()
 
         ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options)
         self._setQueryParams(ds,
@@ -153,7 +154,7 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                              start_time,
                              end_time)
 
-        nexus_tiles = self._find_global_tile_set()
+        nexus_tiles = self._find_global_tile_set(metrics_callback=metrics_record.record_metrics)
 
         if len(nexus_tiles) == 0:
             raise NoDataException(reason="No data found for selected timeframe")
@@ -167,7 +168,8 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                                                                 bbox.bounds[2],
                                                                 ds,
                                                                 start_time,
-                                                                end_time)
+                                                                end_time,
+                                                                metrics_callback=metrics_record.record_metrics)
         ndays = len(daysinrange)
         if ndays == 0:
             raise NoDataException(reason="No data found for selected timeframe")
@@ -175,7 +177,6 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         for i, d in enumerate(daysinrange):
             self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
 
-
         self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
         self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
         self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
@@ -200,7 +201,9 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         max_time_parts = 72
         num_time_parts = min(max_time_parts, ndays)
 
-        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1))
+        spark_part_time_ranges = np.tile(
+            np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]),
+            (len(nexus_tiles_spark), 1))
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
 
@@ -209,24 +212,18 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         self.log.info('Using {} partitions'.format(spark_nparts))
 
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
-        sum_count_part = rdd.map(self._map)
-        sum_count = \
-            sum_count_part.combineByKey(lambda val: val,
-                                        lambda x, val: (x[0] + val[0],
-                                                        x[1] + val[1]),
-                                        lambda x, y: (x[0] + y[0], x[1] + y[1]))
-        fill = self._fill
-        avg_tiles = \
-            sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
-                          (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x])
-                          if (cnt_tile[y, x] > 0)
-                          else fill,
-                                      'cnt': cnt_tile[y, x]}
-                                     for x in
-                                     range(sum_tile.shape[1])]
-                                    for y in
-                                    range(sum_tile.shape[0])])).collect()
-
+        metrics_record.record_metrics(partitions=rdd.getNumPartitions())
+        sum_count_part = rdd.map(partial(self._map, metrics_record.record_metrics))
+        reduce_duration = 0
+        reduce_start = datetime.now()
+        sum_count = sum_count_part.combineByKey(lambda val: val,
+                                                lambda x, val: (x[0] + val[0],
+                                                                x[1] + val[1]),
+                                                lambda x, y: (x[0] + y[0], x[1] + y[1]))
+        reduce_duration += (datetime.now() - reduce_start).total_seconds()
+        avg_tiles = sum_count.map(partial(calculate_means, metrics_record.record_metrics, self._fill)).collect()
+
+        reduce_start = datetime.now()
         # Combine subset results to produce global map.
         #
         # The tiles below are NOT Nexus objects.  They are tuples
@@ -262,6 +259,8 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                             tile_min_lon, tile_max_lon,
                             y0, y1, x0, x1))
 
+        reduce_duration += (datetime.now() - reduce_start).total_seconds()
+
         # Store global map in a NetCDF file.
         self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
 
@@ -270,6 +269,10 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                      'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
                     for x in range(a.shape[1])] for y in range(a.shape[0])]
 
+        total_duration = (datetime.now() - request_start_time).total_seconds()
+        metrics_record.record_metrics(actual_time=total_duration, reduce=reduce_duration)
+        metrics_record.print_metrics(self.log)
+
         return NexusResults(results=results, meta={}, stats=None,
                             computeOptions=None, minLat=bbox.bounds[1],
                             maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
@@ -277,7 +280,7 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                             endTime=end_time)
 
     @staticmethod
-    def _map(tile_in_spark):
+    def _map(metrics_callback, tile_in_spark):
         tile_bounds = tile_in_spark[0]
         (min_lat, max_lat, min_lon, max_lon,
          min_y, max_y, min_x, max_x) = tile_bounds
@@ -294,20 +297,46 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
         cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
         t_start = startTime
+
+        calculation_duration = 0.0
         while t_start <= endTime:
             t_end = min(t_start + t_incr, endTime)
 
-            nexus_tiles = \
-                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
-                                                      min_lon, max_lon,
-                                                      ds=ds,
-                                                      start_time=t_start,
-                                                      end_time=t_end)
+            nexus_tiles = tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                                min_lon, max_lon,
+                                                                ds=ds,
+                                                                start_time=t_start,
+                                                                end_time=t_end,
+                                                                metrics_callback=metrics_callback)
 
+            calculation_start = datetime.now()
             for tile in nexus_tiles:
                 tile.data.data[:, :] = np.nan_to_num(tile.data.data)
                 sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
                 cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 1]).astype(np.uint8)
             t_start = t_end + 1
+            calculation_duration += (datetime.now() - calculation_start).total_seconds()
+
+        metrics_callback(calculation=calculation_duration)
 
         return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)
+
+
+def calculate_means(metrics_callback, fill, (bounds, (sum_tile, cnt_tile))):
+    start_time = datetime.now()
+
+    outer = []
+    for y in range(sum_tile.shape[0]):
+        inner = []
+        for x in range(sum_tile.shape[1]):
+            value = {
+                'avg': (sum_tile[y, x] / cnt_tile[y, x]) if (cnt_tile[y, x] > 0) else fill,
+                'cnt': cnt_tile[y, x]
+            }
+            inner.append(value)
+        outer.append(inner)
+
+    duration = (datetime.now() - start_time).total_seconds()
+    metrics_callback(calculation=duration)
+
+    return bounds, outer
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 4a102aa..07b95f2 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -13,14 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-import math
 import calendar
 import itertools
 import logging
 import traceback
 from cStringIO import StringIO
 from datetime import datetime
+from functools import partial
 
 import matplotlib.dates as mdates
 import matplotlib.pyplot as plt
@@ -32,7 +31,6 @@ from backports.functools_lru_cache import lru_cache
 from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 from scipy import stats
-
 from webservice import Filtering as filtering
 from webservice.NexusHandler import nexus_handler, SparkHandler
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
@@ -40,6 +38,8 @@ from webservice.webmodel import NexusResults, NoDataException, NexusProcessingEx
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 
+logger = logging.getLogger(__name__)
+
 
 @nexus_handler
 class TimeSeriesHandlerImpl(SparkHandler):
@@ -168,8 +168,10 @@ class TimeSeriesHandlerImpl(SparkHandler):
         :param args: dict
         :return:
         """
-
-        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments(request)
+        start_time = datetime.now()
+        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments(
+            request)
+        metrics_record = self._create_metrics_record()
 
         resultsRaw = []
 
@@ -182,7 +184,8 @@ class TimeSeriesHandlerImpl(SparkHandler):
                                                                     bounding_polygon.bounds[2],
                                                                     shortName,
                                                                     start_seconds_from_epoch,
-                                                                    end_seconds_from_epoch)
+                                                                    end_seconds_from_epoch,
+                                                                    metrics_callback=metrics_record.record_metrics)
             self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
 
             ndays = len(daysinrange)
@@ -194,12 +197,9 @@ class TimeSeriesHandlerImpl(SparkHandler):
                 self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
             spark_nparts = self._spark_nparts(nparts_requested)
             self.log.info('Using {} partitions'.format(spark_nparts))
-            the_time = datetime.now()
             results, meta = spark_driver(daysinrange, bounding_polygon,
-                                         shortName, spark_nparts=spark_nparts,
+                                         shortName, metrics_record.record_metrics, spark_nparts=spark_nparts,
                                          sc=self._sc)
-            self.log.info(
-                "Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
 
             if apply_seasonal_cycle_filter:
                 the_time = datetime.now()
@@ -267,6 +267,10 @@ class TimeSeriesHandlerImpl(SparkHandler):
                                 maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
                                 endTime=end_seconds_from_epoch)
 
+        total_duration = (datetime.now() - start_time).total_seconds()
+        metrics_record.record_metrics(actual_time=total_duration)
+        metrics_record.print_metrics(logger)
+
         self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
         return res
 
@@ -486,7 +490,7 @@ class TimeSeriesResults(NexusResults):
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999.,
                  spark_nparts=1, sc=None):
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
                           list(daysinrange_part), fill)
@@ -495,14 +499,15 @@ def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
 
     # Launch Spark computations
     rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
-    results = rdd.map(calc_average_on_day).collect()
+    metrics_callback(partitions=rdd.getNumPartitions())
+    results = rdd.flatMap(partial(calc_average_on_day, metrics_callback)).collect()
     results = list(itertools.chain.from_iterable(results))
     results = sorted(results, key=lambda entry: entry["time"])
 
     return results, {}
 
 
-def calc_average_on_day(tile_in_spark):
+def calc_average_on_day(metrics_callback, tile_in_spark):
     import shapely.wkt
     from datetime import datetime
     from pytz import timezone
@@ -517,7 +522,10 @@ def calc_average_on_day(tile_in_spark):
                                                   dataset,
                                                   timestamps[0],
                                                   timestamps[-1],
-                                                  rows=5000)
+                                                  rows=5000,
+                                                  metrics_callback=metrics_callback)
+
+    calculation_start = datetime.now()
 
     tile_dict = {}
     for timeinseconds in timestamps:
@@ -568,4 +576,8 @@ def calc_average_on_day(tile_in_spark):
             'iso_time': datetime.utcfromtimestamp(int(timeinseconds)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601)
         }
         stats_arr.append(stat)
-    return stats_arr
+
+    calculation_time = (datetime.now() - calculation_start).total_seconds()
+    metrics_callback(calculation=calculation_time)
+
+    return [stats_arr]
diff --git a/analysis/webservice/metrics/MetricsField.py b/analysis/webservice/metrics/MetricsField.py
new file mode 100644
index 0000000..a84a8ce
--- /dev/null
+++ b/analysis/webservice/metrics/MetricsField.py
@@ -0,0 +1,34 @@
+from abc import abstractmethod
+
+
+class MetricsField(object):
+    def __init__(self, key, description, initial_value):
+        self.key = key
+        self.description = description
+        self._value = initial_value
+
+    @abstractmethod
+    def add(self, addend):
+        pass
+
+    def value(self):
+        return self._value
+
+
+class SparkAccumulatorMetricsField(MetricsField):
+    def __init__(self, key, description, accumulator):
+        super(SparkAccumulatorMetricsField, self).__init__(key, description, accumulator)
+
+    def add(self, addend):
+        self._value.add(addend)
+
+    def value(self):
+        return self._value.value
+
+
+class NumberMetricsField(MetricsField):
+    def __init__(self, key, description, initial_value=0):
+        super(NumberMetricsField, self).__init__(key, description, initial_value)
+
+    def add(self, addend):
+        self._value += addend
diff --git a/analysis/webservice/metrics/MetricsRecord.py b/analysis/webservice/metrics/MetricsRecord.py
new file mode 100644
index 0000000..7556ade
--- /dev/null
+++ b/analysis/webservice/metrics/MetricsRecord.py
@@ -0,0 +1,32 @@
+from collections import OrderedDict
+import logging
+
+metrics_logger = logging.getLogger(__name__)
+
+
+class MetricsRecord(object):
+    def __init__(self, fields):
+        self._fields = OrderedDict()
+        for field in fields:
+            self._fields[field.key] = field
+
+    def record_metrics(self, **kwargs):
+        for field_key, addend in kwargs.items():
+            if field_key in self._fields:
+                self._fields[field_key].add(addend)
+
+    def print_metrics(self, logger=None, include_zero_values=False):
+        if not logger:
+            logger = metrics_logger
+
+        logging_lines = []
+        for field in self._fields.values():
+            value = field.value()
+            if value > 0 or include_zero_values:
+                line = "{description}: {value}".format(description=field.description, value=field.value())
+                logging_lines.append(line)
+
+        logger.info('\n'.join(logging_lines))
+
+    def write_metrics(self):
+        raise NotImplementedError
diff --git a/analysis/webservice/metrics/__init__.py b/analysis/webservice/metrics/__init__.py
new file mode 100644
index 0000000..48b579e
--- /dev/null
+++ b/analysis/webservice/metrics/__init__.py
@@ -0,0 +1,2 @@
+from MetricsField import MetricsField, SparkAccumulatorMetricsField, NumberMetricsField
+from MetricsRecord import MetricsRecord
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 81f086f..71fd0bb 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import ConfigParser
+import logging
 import sys
 from datetime import datetime
 from functools import wraps
@@ -21,33 +22,49 @@ from functools import wraps
 import numpy as np
 import numpy.ma as ma
 import pkg_resources
-import dao.CassandraProxy
-import dao.S3Proxy
-import dao.DynamoProxy
-import dao.SolrProxy
 from pytz import timezone, UTC
 from shapely.geometry import MultiPolygon, box
 
+import dao.CassandraProxy
+import dao.DynamoProxy
+import dao.S3Proxy
+import dao.SolrProxy
 from model.nexusmodel import Tile, BBox, TileStats
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger("testing")
+
 
 def tile_data(default_fetch=True):
     def tile_data_decorator(func):
         @wraps(func)
         def fetch_data_for_func(*args, **kwargs):
-            if ('fetch_data' not in kwargs and not default_fetch) or (
-                            'fetch_data' in kwargs and not kwargs['fetch_data']):
-                solr_docs = func(*args, **kwargs)
-                tiles = args[0]._solr_docs_to_tiles(*solr_docs)
-                return tiles
-            else:
-                solr_docs = func(*args, **kwargs)
-                tiles = args[0]._solr_docs_to_tiles(*solr_docs)
+            solr_start = datetime.now()
+            solr_docs = func(*args, **kwargs)
+            solr_duration = (datetime.now() - solr_start).total_seconds()
+            tiles = args[0]._solr_docs_to_tiles(*solr_docs)
+
+            cassandra_duration = 0
+            if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch):
                 if len(tiles) > 0:
+                    cassandra_start = datetime.now()
                     args[0].fetch_data_for_tiles(*tiles)
-                return tiles
+                    cassandra_duration += (datetime.now() - cassandra_start).total_seconds()
+
+            if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None:
+                try:
+                    kwargs['metrics_callback'](cassandra=cassandra_duration,
+                                               solr=solr_duration,
+                                               num_tiles=len(tiles))
+                except Exception as e:
+                    logger.error("Metrics callback '{}'raised an exception. Will continue anyway. " +
+                                 "The exception was: {}".format(kwargs['metrics_callback'], e))
+            return tiles
 
         return fetch_data_for_func
 
@@ -98,9 +115,16 @@ class NexusTileService(object):
     def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
         return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
 
-    def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, **kwargs):
-        return self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
-                                                 **kwargs)
+    def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+                               metrics_callback=None, **kwargs):
+        start = datetime.now()
+        result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time,
+                                                            end_time,
+                                                            **kwargs)
+        duration = (datetime.now() - start).total_seconds()
+        if metrics_callback:
+            metrics_callback(solr=duration)
+        return result
 
     @tile_data()
     def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
@@ -127,7 +151,8 @@ class NexusTileService(object):
         :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
         """
         try:
-            tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, day_of_year)
+            tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds,
+                                                                                        day_of_year)
         except IndexError:
             raise NexusTileServiceException("No tile found."), None, sys.exc_info()[2]
 
@@ -135,13 +160,14 @@ class NexusTileService(object):
 
     @tile_data()
     def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
-                                                        **kwargs)
+        return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
+                                                                 rows=5000,
+                                                                 **kwargs)
 
     @tile_data()
     def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
         return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
-                                                            **kwargs)
+                                                                     **kwargs)
 
     @tile_data()
     def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
@@ -151,7 +177,7 @@ class NexusTileService(object):
         if type(end_time) is datetime:
             end_time = (end_time - EPOCH).total_seconds()
         return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
-                                                            end_time, **kwargs)
+                                                                     end_time, **kwargs)
 
     @tile_data()
     def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
@@ -159,8 +185,9 @@ class NexusTileService(object):
         if 'sort' in kwargs.keys():
             tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs)
         else:
-            tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, end_time,
-                                                                     **kwargs)
+            tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time,
+                                                                              end_time,
+                                                                              **kwargs)
         return tiles
 
     @tile_data()
@@ -206,14 +233,16 @@ class NexusTileService(object):
         :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
         :return:
         """
-        tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, start_time,
-                                                      end_time)
+        tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds,
+                                                               start_time,
+                                                               end_time)
         return tiles
 
     @tile_data()
     def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
-                                                          **kwargs)
+        return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
+                                                                   rows=5000,
+                                                                   **kwargs)
 
     def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1,
                                  **kwargs):
@@ -225,7 +254,8 @@ class NexusTileService(object):
         return tiles
 
     def get_tiles_bounded_by_polygon(self, polygon, ds=None, start_time=0, end_time=-1, **kwargs):
-        tiles = self.find_tiles_in_polygon(polygon, ds, start_time, end_time, **kwargs)
+        tiles = self.find_tiles_in_polygon(polygon, ds, start_time, end_time,
+                                           **kwargs)
         tiles = self.mask_tiles_to_polygon(polygon, tiles)
         if 0 < start_time <= end_time:
             tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles)
@@ -260,7 +290,7 @@ class NexusTileService(object):
 
     def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
         tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
-                                                             **kwargs)
+                                                                      **kwargs)
 
         return tiles
 
@@ -446,12 +476,14 @@ class NexusTileService(object):
                 pass
 
             try:
-                tile.min_time = datetime.strptime(solr_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+                tile.min_time = datetime.strptime(solr_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
+                    tzinfo=UTC)
             except KeyError:
                 pass
 
             try:
-                tile.max_time = datetime.strptime(solr_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+                tile.max_time = datetime.strptime(solr_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
+                    tzinfo=UTC)
             except KeyError:
                 pass
 
diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile
index f4e206a..632357f 100644
--- a/docker/nexus-webapp/Dockerfile
+++ b/docker/nexus-webapp/Dockerfile
@@ -18,6 +18,10 @@ FROM nexusjpl/alpine-pyspark:2.4.4
 MAINTAINER Apache SDAP "dev@sdap.apache.org"
 
 
+ARG CONDA_VERSION="4.7.12.1"
+ARG CONDA_MD5="81c773ff87af5cfac79ab862942ab6b3"
+ARG CONDA_DIR="/opt/conda"
+
 ENV PYTHONPATH=${PYTHONPATH}:/opt/spark/python:/opt/spark/python/lib/py4j-0.10.7-src.zip:/opt/spark/python/lib/pyspark.zip/python:/usr/lib \
     NEXUS_SRC=/tmp/incubator-sdap-nexus \
     PROJ_LIB=/opt/conda/lib/python2.7/site-packages/pyproj/data	\
@@ -52,10 +56,6 @@ RUN apk --no-cache add wget zlib && \
     ln -s /lib/libc.musl-x86_64.so.1 /usr/glibc-compat/lib && \
     ln -s /usr/lib/libgcc_s.so.1 /usr/glibc-compat/lib
 
-ARG CONDA_VERSION="4.7.12.1"
-ARG CONDA_MD5="81c773ff87af5cfac79ab862942ab6b3"
-ARG CONDA_DIR="/opt/conda"
-
 COPY docker/nexus-webapp/install_conda.sh ./install_conda.sh
 RUN /tmp/install_conda.sh