You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/22 23:13:18 UTC
[incubator-sdap-nexus] branch master updated: SDAP-232: Add
performance metrics logging. (#98)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 5c3d5d8 SDAP-232: Add performance metrics logging. (#98)
5c3d5d8 is described below
commit 5c3d5d82b9817499ce2af9161402272f7d3049fd
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Jun 22 16:13:08 2020 -0700
SDAP-232: Add performance metrics logging. (#98)
Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
analysis/setup.py | 3 +-
analysis/webservice/NexusHandler.py | 39 +++++++--
.../webservice/algorithms_spark/HofMoellerSpark.py | 42 ++++++++--
.../webservice/algorithms_spark/TimeAvgMapSpark.py | 93 +++++++++++++--------
.../webservice/algorithms_spark/TimeSeriesSpark.py | 42 ++++++----
analysis/webservice/metrics/MetricsField.py | 34 ++++++++
analysis/webservice/metrics/MetricsRecord.py | 32 ++++++++
analysis/webservice/metrics/__init__.py | 2 +
data-access/nexustiles/nexustiles.py | 94 +++++++++++++++-------
docker/nexus-webapp/Dockerfile | 8 +-
10 files changed, 292 insertions(+), 97 deletions(-)
diff --git a/analysis/setup.py b/analysis/setup.py
index 57089fc..d6d6803 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -40,7 +40,8 @@ setuptools.setup(
'webservice',
'webservice.algorithms',
'webservice.algorithms.doms',
- 'webservice.algorithms_spark'
+ 'webservice.algorithms_spark',
+ 'webservice.metrics'
],
package_data={
'webservice': ['config/web.ini', 'config/algorithms.ini'],
diff --git a/analysis/webservice/NexusHandler.py b/analysis/webservice/NexusHandler.py
index fd3cb4b..87d2af7 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -21,7 +21,8 @@ import types
import numpy as np
from netCDF4 import Dataset
from nexustiles.nexustiles import NexusTileService
-
+from webservice.metrics.MetricsField import NumberMetricsField, SparkAccumulatorMetricsField
+from webservice.metrics.MetricsRecord import MetricsRecord
from webservice.webmodel import NexusProcessingException
AVAILABLE_HANDLERS = []
@@ -329,10 +330,10 @@ class SparkHandler(NexusHandler):
self._endYear = end_year
self._climMonth = clim_month
self._fill = fill
-
+
def _set_info_from_tile_set(self, nexus_tiles):
ntiles = len(nexus_tiles)
- self.log.debug('Attempting to extract info from {0} tiles'.\
+ self.log.debug('Attempting to extract info from {0} tiles'. \
format(ntiles))
status = False
self._latRes = None
@@ -350,7 +351,7 @@ class SparkHandler(NexusHandler):
if (len(lons) > 1):
self._lonRes = abs(lons[1] - lons[0])
if ((self._latRes is not None) and
- (self._lonRes is not None)):
+ (self._lonRes is not None)):
lats_agg = np.concatenate([tile.latitudes.compressed()
for tile in nexus_tiles])
lons_agg = np.concatenate([tile.longitudes.compressed()
@@ -367,7 +368,7 @@ class SparkHandler(NexusHandler):
break
return status
- def _find_global_tile_set(self):
+ def _find_global_tile_set(self, metrics_callback=None):
# This only works for a single dataset. If more than one is provided,
# we use the first one and ignore the rest.
if type(self._ds) in (list, tuple):
@@ -382,7 +383,8 @@ class SparkHandler(NexusHandler):
self._maxLon,
ds,
self._startTime,
- self._endTime)
+ self._endTime,
+ metrics_callback=metrics_callback)
# Empty tile set will be returned upon failure to find the global
# tile set.
@@ -391,7 +393,9 @@ class SparkHandler(NexusHandler):
# Check one time stamp at a time and attempt to extract the global
# tile set.
for t in t_in_range:
- nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=ds, start_time=t, end_time=t)
+ nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon,
+ self._maxLon, ds=ds, start_time=t, end_time=t,
+ metrics_callback=metrics_callback)
if self._set_info_from_tile_set(nexus_tiles):
# Successfully retrieved global tile set from nexus_tiles,
# so no need to check any other time stamps.
@@ -581,6 +585,27 @@ class SparkHandler(NexusHandler):
max_parallelism)
return num_partitions
+ def _create_metrics_record(self):
+ return MetricsRecord([
+ SparkAccumulatorMetricsField(key='num_tiles',
+ description='Number of tiles fetched',
+ accumulator=self._sc.accumulator(0)),
+ SparkAccumulatorMetricsField(key='partitions',
+ description='Number of Spark partitions',
+ accumulator=self._sc.accumulator(0)),
+ SparkAccumulatorMetricsField(key='cassandra',
+ description='Cumulative time to fetch data from Cassandra',
+ accumulator=self._sc.accumulator(0)),
+ SparkAccumulatorMetricsField(key='solr',
+ description='Cumulative time to fetch data from Solr',
+ accumulator=self._sc.accumulator(0)),
+ SparkAccumulatorMetricsField(key='calculation',
+ description='Cumulative time to do calculations',
+ accumulator=self._sc.accumulator(0)),
+ NumberMetricsField(key='reduce', description='Actual time to reduce results'),
+ NumberMetricsField(key="actual_time", description="Total (actual) time")
+ ])
+
def executeInitializers(config):
[wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 1696732..5fd01ba 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -17,6 +17,7 @@ import itertools
import logging
from cStringIO import StringIO
from datetime import datetime
+from functools import partial
import matplotlib.pyplot as plt
import mpld3
@@ -26,7 +27,6 @@ from matplotlib import cm
from matplotlib.ticker import FuncFormatter
from nexustiles.nexustiles import NexusTileService
from pytz import timezone
-
from webservice.NexusHandler import SparkHandler, nexus_handler
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
@@ -40,7 +40,7 @@ LONGITUDE = 1
class HofMoellerCalculator(object):
@staticmethod
- def hofmoeller_stats(tile_in_spark):
+ def hofmoeller_stats(metrics_callback, tile_in_spark):
(latlon, tile_id, index,
min_lat, max_lat, min_lon, max_lon) = tile_in_spark
@@ -48,7 +48,8 @@ class HofMoellerCalculator(object):
tile_service = NexusTileService()
try:
# Load the dataset tile
- tile = tile_service.find_tile_by_id(tile_id)[0]
+ tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0]
+ calculation_start = datetime.now()
# Mask it to the search domain
tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
min_lon, max_lon, [tile])[0]
@@ -93,6 +94,9 @@ class HofMoellerCalculator(object):
np.max(vals).item(),
np.min(vals).item(),
np.var(vals).item())))
+ calculation_duration = (datetime.now() - calculation_start).total_seconds()
+ metrics_callback(calculation=calculation_duration)
+
return stats
@@ -261,7 +265,8 @@ def hof_tuple_to_dict(t, avg_var_name):
'max': t[6],
'min': t[7]}
-def spark_driver(sc, latlon, nexus_tiles_spark):
+
+def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback):
# Parallelize list of tile ids
rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
if latlon == 0:
@@ -276,10 +281,12 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
# Create a set of key-value pairs where the key is (time, lat|lon) and
# the value is a tuple of intermediate statistics for the specified
# coordinate within a single NEXUS tile.
- results = rdd.flatMap(HofMoellerCalculator.hofmoeller_stats)
+ metrics_callback(partitions=rdd.getNumPartitions())
+ results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, metrics_callback))
# Combine tuples across tiles with input key = (time, lat|lon)
# Output a key value pair with key = (time)
+ reduce_start = datetime.now()
results = results.combineByKey(lambda val: (hof_tuple_time(val), val),
lambda x, val: (hof_tuple_time(x),
hof_tuple_combine(x[1],
@@ -317,6 +324,9 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
values(). \
collect()
+ reduce_duration = (datetime.now() - reduce_start).total_seconds()
+ metrics_callback(reduce=reduce_duration)
+
return results
@@ -335,18 +345,22 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
def calc(self, compute_options, **args):
ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+ metrics_record = self._create_metrics_record()
+ calculation_start = datetime.now()
+
min_lon, min_lat, max_lon, max_lat = bbox.bounds
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
ds, start_time, end_time,
+ metrics_callback=metrics_record.record_metrics,
fetch_data=False))]
print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
raise NoDataException(reason="No data found for selected timeframe")
- results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
+ results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry['time'])
for i in range(len(results)):
@@ -359,6 +373,11 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LATITUDE,
minLat=min_lat, maxLat=max_lat, minLon=min_lon,
maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
+
+ duration = (datetime.now() - calculation_start).total_seconds()
+ metrics_record.record_metrics(actual_time=duration)
+ metrics_record.print_metrics(self.log)
+
return result
@@ -377,18 +396,22 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
def calc(self, compute_options, **args):
ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+ metrics_record = self._create_metrics_record()
+ calculation_start = datetime.now()
+
min_lon, min_lat, max_lon, max_lat = bbox.bounds
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
ds, start_time, end_time,
+ metrics_callback=metrics_record.record_metrics,
fetch_data=False))]
print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
raise NoDataException(reason="No data found for selected timeframe")
- results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
+ results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry["time"])
@@ -402,6 +425,11 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LONGITUDE,
minLat=min_lat, maxLat=max_lat, minLon=min_lon,
maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
+
+ duration = (datetime.now() - calculation_start).total_seconds()
+ metrics_record.record_metrics(actual_time=duration)
+ metrics_record.print_metrics(self.log)
+
return result
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 9b00489..b1d11d4 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -13,16 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import math
import logging
from datetime import datetime
+from functools import partial
import numpy as np
import shapely.geometry
from nexustiles.nexustiles import NexusTileService
from pytz import timezone
-
from webservice.NexusHandler import nexus_handler, SparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
@@ -143,6 +141,9 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
:param args: dict
:return:
"""
+ request_start_time = datetime.now()
+
+ metrics_record = self._create_metrics_record()
ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options)
self._setQueryParams(ds,
@@ -153,7 +154,7 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
start_time,
end_time)
- nexus_tiles = self._find_global_tile_set()
+ nexus_tiles = self._find_global_tile_set(metrics_callback=metrics_record.record_metrics)
if len(nexus_tiles) == 0:
raise NoDataException(reason="No data found for selected timeframe")
@@ -167,7 +168,8 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
bbox.bounds[2],
ds,
start_time,
- end_time)
+ end_time,
+ metrics_callback=metrics_record.record_metrics)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
@@ -175,7 +177,6 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
for i, d in enumerate(daysinrange):
self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
-
self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
@@ -200,7 +201,9 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
max_time_parts = 72
num_time_parts = min(max_time_parts, ndays)
- spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1))
+ spark_part_time_ranges = np.tile(
+ np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]),
+ (len(nexus_tiles_spark), 1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
@@ -209,24 +212,18 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- sum_count_part = rdd.map(self._map)
- sum_count = \
- sum_count_part.combineByKey(lambda val: val,
- lambda x, val: (x[0] + val[0],
- x[1] + val[1]),
- lambda x, y: (x[0] + y[0], x[1] + y[1]))
- fill = self._fill
- avg_tiles = \
- sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
- (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x])
- if (cnt_tile[y, x] > 0)
- else fill,
- 'cnt': cnt_tile[y, x]}
- for x in
- range(sum_tile.shape[1])]
- for y in
- range(sum_tile.shape[0])])).collect()
-
+ metrics_record.record_metrics(partitions=rdd.getNumPartitions())
+ sum_count_part = rdd.map(partial(self._map, metrics_record.record_metrics))
+ reduce_duration = 0
+ reduce_start = datetime.now()
+ sum_count = sum_count_part.combineByKey(lambda val: val,
+ lambda x, val: (x[0] + val[0],
+ x[1] + val[1]),
+ lambda x, y: (x[0] + y[0], x[1] + y[1]))
+ reduce_duration += (datetime.now() - reduce_start).total_seconds()
+ avg_tiles = sum_count.map(partial(calculate_means, metrics_record.record_metrics, self._fill)).collect()
+
+ reduce_start = datetime.now()
# Combine subset results to produce global map.
#
# The tiles below are NOT Nexus objects. They are tuples
@@ -262,6 +259,8 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
tile_min_lon, tile_max_lon,
y0, y1, x0, x1))
+ reduce_duration += (datetime.now() - reduce_start).total_seconds()
+
# Store global map in a NetCDF file.
self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
@@ -270,6 +269,10 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
for x in range(a.shape[1])] for y in range(a.shape[0])]
+ total_duration = (datetime.now() - request_start_time).total_seconds()
+ metrics_record.record_metrics(actual_time=total_duration, reduce=reduce_duration)
+ metrics_record.print_metrics(self.log)
+
return NexusResults(results=results, meta={}, stats=None,
computeOptions=None, minLat=bbox.bounds[1],
maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
@@ -277,7 +280,7 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
endTime=end_time)
@staticmethod
- def _map(tile_in_spark):
+ def _map(metrics_callback, tile_in_spark):
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
min_y, max_y, min_x, max_x) = tile_bounds
@@ -294,20 +297,46 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
t_start = startTime
+
+ calculation_duration = 0.0
while t_start <= endTime:
t_end = min(t_start + t_incr, endTime)
- nexus_tiles = \
- tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
- min_lon, max_lon,
- ds=ds,
- start_time=t_start,
- end_time=t_end)
+ nexus_tiles = tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+ min_lon, max_lon,
+ ds=ds,
+ start_time=t_start,
+ end_time=t_end,
+ metrics_callback=metrics_callback)
+ calculation_start = datetime.now()
for tile in nexus_tiles:
tile.data.data[:, :] = np.nan_to_num(tile.data.data)
sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 1]).astype(np.uint8)
t_start = t_end + 1
+ calculation_duration += (datetime.now() - calculation_start).total_seconds()
+
+ metrics_callback(calculation=calculation_duration)
return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)
+
+
+def calculate_means(metrics_callback, fill, (bounds, (sum_tile, cnt_tile))):
+ start_time = datetime.now()
+
+ outer = []
+ for y in range(sum_tile.shape[0]):
+ inner = []
+ for x in range(sum_tile.shape[1]):
+ value = {
+ 'avg': (sum_tile[y, x] / cnt_tile[y, x]) if (cnt_tile[y, x] > 0) else fill,
+ 'cnt': cnt_tile[y, x]
+ }
+ inner.append(value)
+ outer.append(inner)
+
+ duration = (datetime.now() - start_time).total_seconds()
+ metrics_callback(calculation=duration)
+
+ return bounds, outer
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 4a102aa..07b95f2 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -13,14 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import math
import calendar
import itertools
import logging
import traceback
from cStringIO import StringIO
from datetime import datetime
+from functools import partial
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
@@ -32,7 +31,6 @@ from backports.functools_lru_cache import lru_cache
from nexustiles.nexustiles import NexusTileService
from pytz import timezone
from scipy import stats
-
from webservice import Filtering as filtering
from webservice.NexusHandler import nexus_handler, SparkHandler
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
@@ -40,6 +38,8 @@ from webservice.webmodel import NexusResults, NoDataException, NexusProcessingEx
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+logger = logging.getLogger(__name__)
+
@nexus_handler
class TimeSeriesHandlerImpl(SparkHandler):
@@ -168,8 +168,10 @@ class TimeSeriesHandlerImpl(SparkHandler):
:param args: dict
:return:
"""
-
- ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments(request)
+ start_time = datetime.now()
+ ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments(
+ request)
+ metrics_record = self._create_metrics_record()
resultsRaw = []
@@ -182,7 +184,8 @@ class TimeSeriesHandlerImpl(SparkHandler):
bounding_polygon.bounds[2],
shortName,
start_seconds_from_epoch,
- end_seconds_from_epoch)
+ end_seconds_from_epoch,
+ metrics_callback=metrics_record.record_metrics)
self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
ndays = len(daysinrange)
@@ -194,12 +197,9 @@ class TimeSeriesHandlerImpl(SparkHandler):
self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
- the_time = datetime.now()
results, meta = spark_driver(daysinrange, bounding_polygon,
- shortName, spark_nparts=spark_nparts,
+ shortName, metrics_record.record_metrics, spark_nparts=spark_nparts,
sc=self._sc)
- self.log.info(
- "Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
if apply_seasonal_cycle_filter:
the_time = datetime.now()
@@ -267,6 +267,10 @@ class TimeSeriesHandlerImpl(SparkHandler):
maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
endTime=end_seconds_from_epoch)
+ total_duration = (datetime.now() - start_time).total_seconds()
+ metrics_record.record_metrics(actual_time=total_duration)
+ metrics_record.print_metrics(logger)
+
self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
return res
@@ -486,7 +490,7 @@ class TimeSeriesResults(NexusResults):
return sio.getvalue()
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999.,
spark_nparts=1, sc=None):
nexus_tiles_spark = [(bounding_polygon.wkt, ds,
list(daysinrange_part), fill)
@@ -495,14 +499,15 @@ def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
# Launch Spark computations
rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
- results = rdd.map(calc_average_on_day).collect()
+ metrics_callback(partitions=rdd.getNumPartitions())
+ results = rdd.flatMap(partial(calc_average_on_day, metrics_callback)).collect()
results = list(itertools.chain.from_iterable(results))
results = sorted(results, key=lambda entry: entry["time"])
return results, {}
-def calc_average_on_day(tile_in_spark):
+def calc_average_on_day(metrics_callback, tile_in_spark):
import shapely.wkt
from datetime import datetime
from pytz import timezone
@@ -517,7 +522,10 @@ def calc_average_on_day(tile_in_spark):
dataset,
timestamps[0],
timestamps[-1],
- rows=5000)
+ rows=5000,
+ metrics_callback=metrics_callback)
+
+ calculation_start = datetime.now()
tile_dict = {}
for timeinseconds in timestamps:
@@ -568,4 +576,8 @@ def calc_average_on_day(tile_in_spark):
'iso_time': datetime.utcfromtimestamp(int(timeinseconds)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601)
}
stats_arr.append(stat)
- return stats_arr
+
+ calculation_time = (datetime.now() - calculation_start).total_seconds()
+ metrics_callback(calculation=calculation_time)
+
+ return [stats_arr]
diff --git a/analysis/webservice/metrics/MetricsField.py b/analysis/webservice/metrics/MetricsField.py
new file mode 100644
index 0000000..a84a8ce
--- /dev/null
+++ b/analysis/webservice/metrics/MetricsField.py
@@ -0,0 +1,34 @@
+from abc import abstractmethod
+
+
+class MetricsField(object):
+ def __init__(self, key, description, initial_value):
+ self.key = key
+ self.description = description
+ self._value = initial_value
+
+ @abstractmethod
+ def add(self, addend):
+ pass
+
+ def value(self):
+ return self._value
+
+
+class SparkAccumulatorMetricsField(MetricsField):
+ def __init__(self, key, description, accumulator):
+ super(SparkAccumulatorMetricsField, self).__init__(key, description, accumulator)
+
+ def add(self, addend):
+ self._value.add(addend)
+
+ def value(self):
+ return self._value.value
+
+
+class NumberMetricsField(MetricsField):
+ def __init__(self, key, description, initial_value=0):
+ super(NumberMetricsField, self).__init__(key, description, initial_value)
+
+ def add(self, addend):
+ self._value += addend
diff --git a/analysis/webservice/metrics/MetricsRecord.py b/analysis/webservice/metrics/MetricsRecord.py
new file mode 100644
index 0000000..7556ade
--- /dev/null
+++ b/analysis/webservice/metrics/MetricsRecord.py
@@ -0,0 +1,32 @@
+from collections import OrderedDict
+import logging
+
+metrics_logger = logging.getLogger(__name__)
+
+
+class MetricsRecord(object):
+ def __init__(self, fields):
+ self._fields = OrderedDict()
+ for field in fields:
+ self._fields[field.key] = field
+
+ def record_metrics(self, **kwargs):
+ for field_key, addend in kwargs.items():
+ if field_key in self._fields:
+ self._fields[field_key].add(addend)
+
+ def print_metrics(self, logger=None, include_zero_values=False):
+ if not logger:
+ logger = metrics_logger
+
+ logging_lines = []
+ for field in self._fields.values():
+ value = field.value()
+ if value > 0 or include_zero_values:
+ line = "{description}: {value}".format(description=field.description, value=field.value())
+ logging_lines.append(line)
+
+ logger.info('\n'.join(logging_lines))
+
+ def write_metrics(self):
+ raise NotImplementedError
diff --git a/analysis/webservice/metrics/__init__.py b/analysis/webservice/metrics/__init__.py
new file mode 100644
index 0000000..48b579e
--- /dev/null
+++ b/analysis/webservice/metrics/__init__.py
@@ -0,0 +1,2 @@
+from MetricsField import MetricsField, SparkAccumulatorMetricsField, NumberMetricsField
+from MetricsRecord import MetricsRecord
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 81f086f..71fd0bb 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -14,6 +14,7 @@
# limitations under the License.
import ConfigParser
+import logging
import sys
from datetime import datetime
from functools import wraps
@@ -21,33 +22,49 @@ from functools import wraps
import numpy as np
import numpy.ma as ma
import pkg_resources
-import dao.CassandraProxy
-import dao.S3Proxy
-import dao.DynamoProxy
-import dao.SolrProxy
from pytz import timezone, UTC
from shapely.geometry import MultiPolygon, box
+import dao.CassandraProxy
+import dao.DynamoProxy
+import dao.S3Proxy
+import dao.SolrProxy
from model.nexusmodel import Tile, BBox, TileStats
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger("testing")
+
def tile_data(default_fetch=True):
def tile_data_decorator(func):
@wraps(func)
def fetch_data_for_func(*args, **kwargs):
- if ('fetch_data' not in kwargs and not default_fetch) or (
- 'fetch_data' in kwargs and not kwargs['fetch_data']):
- solr_docs = func(*args, **kwargs)
- tiles = args[0]._solr_docs_to_tiles(*solr_docs)
- return tiles
- else:
- solr_docs = func(*args, **kwargs)
- tiles = args[0]._solr_docs_to_tiles(*solr_docs)
+ solr_start = datetime.now()
+ solr_docs = func(*args, **kwargs)
+ solr_duration = (datetime.now() - solr_start).total_seconds()
+ tiles = args[0]._solr_docs_to_tiles(*solr_docs)
+
+ cassandra_duration = 0
+ if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch):
if len(tiles) > 0:
+ cassandra_start = datetime.now()
args[0].fetch_data_for_tiles(*tiles)
- return tiles
+ cassandra_duration += (datetime.now() - cassandra_start).total_seconds()
+
+ if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None:
+ try:
+ kwargs['metrics_callback'](cassandra=cassandra_duration,
+ solr=solr_duration,
+ num_tiles=len(tiles))
+ except Exception as e:
+ logger.error("Metrics callback '{}'raised an exception. Will continue anyway. " +
+ "The exception was: {}".format(kwargs['metrics_callback'], e))
+ return tiles
return fetch_data_for_func
@@ -98,9 +115,16 @@ class NexusTileService(object):
def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
- def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, **kwargs):
- return self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
- **kwargs)
+ def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+ metrics_callback=None, **kwargs):
+ start = datetime.now()
+ result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time,
+ end_time,
+ **kwargs)
+ duration = (datetime.now() - start).total_seconds()
+ if metrics_callback:
+ metrics_callback(solr=duration)
+ return result
@tile_data()
def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
@@ -127,7 +151,8 @@ class NexusTileService(object):
:return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
"""
try:
- tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, day_of_year)
+ tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds,
+ day_of_year)
except IndexError:
raise NexusTileServiceException("No tile found."), None, sys.exc_info()[2]
@@ -135,13 +160,14 @@ class NexusTileService(object):
@tile_data()
def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
- return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
- **kwargs)
+ return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
+ rows=5000,
+ **kwargs)
@tile_data()
def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
- **kwargs)
+ **kwargs)
@tile_data()
def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
@@ -151,7 +177,7 @@ class NexusTileService(object):
if type(end_time) is datetime:
end_time = (end_time - EPOCH).total_seconds()
return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
- end_time, **kwargs)
+ end_time, **kwargs)
@tile_data()
def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
@@ -159,8 +185,9 @@ class NexusTileService(object):
if 'sort' in kwargs.keys():
tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs)
else:
- tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, end_time,
- **kwargs)
+ tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time,
+ end_time,
+ **kwargs)
return tiles
@tile_data()
@@ -206,14 +233,16 @@ class NexusTileService(object):
:param kwargs: fetch_data: True/False = whether or not to retrieve tile data
:return:
"""
- tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, start_time,
- end_time)
+ tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds,
+ start_time,
+ end_time)
return tiles
@tile_data()
def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
- return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
- **kwargs)
+ return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
+ rows=5000,
+ **kwargs)
def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1,
**kwargs):
@@ -225,7 +254,8 @@ class NexusTileService(object):
return tiles
def get_tiles_bounded_by_polygon(self, polygon, ds=None, start_time=0, end_time=-1, **kwargs):
- tiles = self.find_tiles_in_polygon(polygon, ds, start_time, end_time, **kwargs)
+ tiles = self.find_tiles_in_polygon(polygon, ds, start_time, end_time,
+ **kwargs)
tiles = self.mask_tiles_to_polygon(polygon, tiles)
if 0 < start_time <= end_time:
tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles)
@@ -260,7 +290,7 @@ class NexusTileService(object):
def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
- **kwargs)
+ **kwargs)
return tiles
@@ -446,12 +476,14 @@ class NexusTileService(object):
pass
try:
- tile.min_time = datetime.strptime(solr_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+ tile.min_time = datetime.strptime(solr_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
+ tzinfo=UTC)
except KeyError:
pass
try:
- tile.max_time = datetime.strptime(solr_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+ tile.max_time = datetime.strptime(solr_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
+ tzinfo=UTC)
except KeyError:
pass
diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile
index f4e206a..632357f 100644
--- a/docker/nexus-webapp/Dockerfile
+++ b/docker/nexus-webapp/Dockerfile
@@ -18,6 +18,10 @@ FROM nexusjpl/alpine-pyspark:2.4.4
MAINTAINER Apache SDAP "dev@sdap.apache.org"
+ARG CONDA_VERSION="4.7.12.1"
+ARG CONDA_MD5="81c773ff87af5cfac79ab862942ab6b3"
+ARG CONDA_DIR="/opt/conda"
+
ENV PYTHONPATH=${PYTHONPATH}:/opt/spark/python:/opt/spark/python/lib/py4j-0.10.7-src.zip:/opt/spark/python/lib/pyspark.zip/python:/usr/lib \
NEXUS_SRC=/tmp/incubator-sdap-nexus \
PROJ_LIB=/opt/conda/lib/python2.7/site-packages/pyproj/data \
@@ -52,10 +56,6 @@ RUN apk --no-cache add wget zlib && \
ln -s /lib/libc.musl-x86_64.so.1 /usr/glibc-compat/lib && \
ln -s /usr/lib/libgcc_s.so.1 /usr/glibc-compat/lib
-ARG CONDA_VERSION="4.7.12.1"
-ARG CONDA_MD5="81c773ff87af5cfac79ab862942ab6b3"
-ARG CONDA_DIR="/opt/conda"
-
COPY docker/nexus-webapp/install_conda.sh ./install_conda.sh
RUN /tmp/install_conda.sh