You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:26 UTC
[49/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all
code under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/StandardDeviationSearch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py
new file mode 100644
index 0000000..002a45d
--- /dev/null
+++ b/analysis/webservice/algorithms/StandardDeviationSearch.py
@@ -0,0 +1,191 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import json
+import logging
+from datetime import datetime
+from functools import partial
+
+from nexustiles.nexustiles import NexusTileServiceException
+from pytz import timezone
+
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusProcessingException, CustomEncoder
+
+SENTINEL = 'STOP'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+
+
+@nexus_handler
+class StandardDeviationSearchHandlerImpl(NexusHandler):
+ name = "Standard Deviation Search"
+ path = "/standardDeviation"
+ description = "Retrieves the pixel standard deviation if it exists for a given longitude and latitude"
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "string",
+ "description": "One or more comma-separated dataset shortnames. Required."
+ },
+ "longitude": {
+ "name": "Longitude",
+ "type": "float",
+ "description": "Longitude in degrees from -180 to 180. Required."
+ },
+ "latitude": {
+ "name": "Latitude",
+ "type": "float",
+ "description": "Latitude in degrees from -90 to 90. Required."
+ },
+ "day": {
+ "name": "Day of Year",
+ "type": "int",
+ "description": "Day of year to search from 0 to 365. One of day or date are required but not both."
+ },
+ "date": {
+ "name": "Date",
+ "type": "string",
+ "description": "Datetime in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970). One of day "
+ "or date are required but not both."
+ },
+ "allInTile": {
+ "name": "Get all Standard Deviations in Tile",
+ "type": "boolean",
+ "description": "Optional True/False flag. If true, return the standard deviations for every pixel in the "
+ "tile that contains the searched lon/lat point. If false, return the "
+ "standard deviation only for the searched lon/lat point. Default: True"
+ }
+
+ }
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+ try:
+ ds = request.get_dataset()[0]
+ except:
+ raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+ try:
+ longitude = float(request.get_decimal_arg("longitude", default=None))
+ except:
+ raise NexusProcessingException(reason="'longitude' argument is required", code=400)
+
+ try:
+ latitude = float(request.get_decimal_arg("latitude", default=None))
+ except:
+ raise NexusProcessingException(reason="'latitude' argument is required", code=400)
+
+ search_datetime = request.get_datetime_arg('date', default=None)
+ day_of_year = request.get_int_arg('day', default=None)
+ if (search_datetime is not None and day_of_year is not None) \
+ or (search_datetime is None and day_of_year is None):
+ raise NexusProcessingException(
+ reason="At least one of 'day' or 'date' arguments are required but not both.",
+ code=400)
+
+ if search_datetime is not None:
+ day_of_year = search_datetime.timetuple().tm_yday
+
+ return_all = request.get_boolean_arg("allInTile", default=True)
+
+ return ds, longitude, latitude, day_of_year, return_all
+
+ def calc(self, request, **args):
+ raw_args_dict = {k: request.get_argument(k) for k in request.requestHandler.request.arguments}
+ ds, longitude, latitude, day_of_year, return_all = self.parse_arguments(request)
+
+ if return_all:
+ func = partial(get_all_std_dev, tile_service=self._tile_service, ds=ds, longitude=longitude,
+ latitude=latitude, day_of_year=day_of_year)
+ else:
+ func = partial(get_single_std_dev, tile_service=self._tile_service, ds=ds, longitude=longitude,
+ latitude=latitude, day_of_year=day_of_year)
+
+ try:
+ results = StandardDeviationSearchHandlerImpl.to_result_dict(func())
+ except (NoTileException, NoStandardDeviationException):
+ return StandardDeviationSearchResult(raw_args_dict, [])
+
+ return StandardDeviationSearchResult(raw_args_dict, results)
+
+ @staticmethod
+ def to_result_dict(list_of_tuples):
+ # list_of_tuples = [(lon, lat, st_dev)]
+ return [
+ {
+ "longitude": lon,
+ "latitude": lat,
+ "standard_deviation": st_dev
+ } for lon, lat, st_dev in list_of_tuples]
+
+
+class NoTileException(Exception):
+ pass
+
+
+class NoStandardDeviationException(Exception):
+ pass
+
+
+def find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year):
+ from shapely.geometry import Point
+ point = Point(longitude, latitude)
+
+ try:
+ tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year(point, ds, day_of_year)[0]
+ except NexusTileServiceException:
+ raise NoTileException
+
+ # Check if this tile has any meta data that ends with 'std'. If it doesn't, just return nothing.
+ try:
+ st_dev_meta_name = next(iter([key for key in tile.meta_data.keys() if key.endswith('std')]))
+ except StopIteration:
+ raise NoStandardDeviationException
+
+ return tile, st_dev_meta_name
+
+
+def get_single_std_dev(tile_service, ds, longitude, latitude, day_of_year):
+ from scipy.spatial import distance
+
+ tile, st_dev_meta_name = find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year)
+
+ # Need to find the closest point in the tile to the input lon/lat point and return only that result
+ valid_indices = tile.get_indices()
+ tile_points = [tuple([tile.longitudes[lon_idx], tile.latitudes[lat_idx]]) for time_idx, lat_idx, lon_idx in
+ valid_indices]
+ closest_point_index = distance.cdist([(longitude, latitude)], tile_points).argmin()
+ closest_lon, closest_lat = tile_points[closest_point_index]
+ closest_point_tile_index = tuple(valid_indices[closest_point_index])
+ std_at_point = tile.meta_data[st_dev_meta_name][closest_point_tile_index]
+ return [tuple([closest_lon, closest_lat, std_at_point])]
+
+
+def get_all_std_dev(tile_service, ds, longitude, latitude, day_of_year):
+ tile, st_dev_meta_name = find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year)
+
+ valid_indices = tile.get_indices()
+ return [tuple([tile.longitudes[lon_idx], tile.latitudes[lat_idx],
+ tile.meta_data[st_dev_meta_name][time_idx, lat_idx, lon_idx]]) for time_idx, lat_idx, lon_idx in
+ valid_indices]
+
+
+class StandardDeviationSearchResult(object):
+ def __init__(self, request_params, results):
+ self.request_params = request_params
+ self.results = results
+
+ def toJson(self):
+ data = {
+ 'meta': self.request_params,
+ 'data': self.results,
+ 'stats': {}
+ }
+ return json.dumps(data, indent=4, cls=CustomEncoder)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TestInitializer.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TestInitializer.py b/analysis/webservice/algorithms/TestInitializer.py
new file mode 100644
index 0000000..3f47c6a
--- /dev/null
+++ b/analysis/webservice/algorithms/TestInitializer.py
@@ -0,0 +1,16 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+
+from webservice.NexusHandler import NexusHandler, nexus_initializer
+from nexustiles.nexustiles import NexusTileService
+
+@nexus_initializer
+class TestInitializer:
+
+ def __init__(self):
+ pass
+
+ def init(self, config):
+ print "*** TEST INITIALIZATION ***"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TileSearch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py
new file mode 100644
index 0000000..d1fade8
--- /dev/null
+++ b/analysis/webservice/algorithms/TileSearch.py
@@ -0,0 +1,71 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults
+
+
+# @nexus_handler
+class ChunkSearchHandlerImpl(NexusHandler):
+ name = "Data Tile Search"
+ path = "/tiles"
+ description = "Lists dataset tiles given a geographical area and time range"
+ singleton = True
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "string",
+ "description": "One dataset shortname"
+ },
+ "minLat": {
+ "name": "Minimum Latitude",
+ "type": "float",
+ "description": "Minimum (Southern) bounding box Latitude"
+ },
+ "maxLat": {
+ "name": "Maximum Latitude",
+ "type": "float",
+ "description": "Maximum (Northern) bounding box Latitude"
+ },
+ "minLon": {
+ "name": "Minimum Longitude",
+ "type": "float",
+ "description": "Minimum (Western) bounding box Longitude"
+ },
+ "maxLon": {
+ "name": "Maximum Longitude",
+ "type": "float",
+ "description": "Maximum (Eastern) bounding box Longitude"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "long integer",
+ "description": "Starting time in seconds since midnight Jan. 1st, 1970 UTC"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "long integer",
+ "description": "Ending time in seconds since midnight Jan. 1st, 1970 UTC"
+ }
+ }
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=True)
+
+ def calc(self, computeOptions, **args):
+ minLat = computeOptions.get_min_lat()
+ maxLat = computeOptions.get_max_lat()
+ minLon = computeOptions.get_min_lon()
+ maxLon = computeOptions.get_max_lon()
+ ds = computeOptions.get_dataset()[0]
+ startTime = computeOptions.get_start_time()
+ endTime = computeOptions.get_end_time()
+ # TODO update to expect tile objects back
+ res = [tile.get_summary() for tile in
+ self._tile_service.find_tiles_in_box(minLat, maxLat, minLon, maxLon, ds, startTime, endTime,
+ fetch_data=False)]
+
+ res = NexusResults(results=res)
+ res.extendMeta(minLat, maxLat, minLon, maxLon, ds, startTime, endTime)
+ return res
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeAvgMap.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py
new file mode 100644
index 0000000..5c76604
--- /dev/null
+++ b/analysis/webservice/algorithms/TimeAvgMap.py
@@ -0,0 +1,265 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+# distutils: include_dirs = /usr/local/lib/python2.7/site-packages/cassandra
+import pyximport
+
+pyximport.install()
+
+import sys
+import numpy as np
+from time import time
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NoDataException
+from netCDF4 import Dataset
+
+#from mpl_toolkits.basemap import Basemap
+
+
+# @nexus_handler
+class TimeAvgMapHandlerImpl(NexusHandler):
+
+ name = "Time Average Map"
+ path = "/timeAvgMap"
+ description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=False)
+
+ def _find_native_resolution(self):
+ # Get a quick set of tiles (1 degree at center of box) at 1 time stamp
+ midLat = (self._minLat+self._maxLat)/2
+ midLon = (self._minLon+self._maxLon)/2
+ ntiles = 0
+ t = self._endTime
+ t_incr = 86400
+ while ntiles == 0:
+ nexus_tiles = self._tile_service.get_tiles_bounded_by_box(midLat-0.5, midLat+0.5, midLon-0.5, midLon+0.5, ds=self._ds, start_time=t-t_incr, end_time=t)
+ ntiles = len(nexus_tiles)
+ print 'find_native_res: got %d tiles' % len(nexus_tiles)
+ sys.stdout.flush()
+ lat_res = 0.
+ lon_res = 0.
+ if ntiles > 0:
+ for tile in nexus_tiles:
+ if lat_res < 1e-10:
+ lats = tile.latitudes.compressed()
+ if (len(lats) > 1):
+ lat_res = lats[1] - lats[0]
+ if lon_res < 1e-10:
+ lons = tile.longitudes.compressed()
+ if (len(lons) > 1):
+ lon_res = lons[1] - lons[0]
+ if (lat_res >= 1e-10) and (lon_res >= 1e-10):
+ break
+ if (lat_res < 1e-10) or (lon_res < 1e-10):
+ t -= t_incr
+
+ self._latRes = lat_res
+ self._lonRes = lon_res
+
+ def _find_global_tile_set(self):
+ ntiles = 0
+ t = self._endTime
+ t_incr = 86400
+ while ntiles == 0:
+ nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=self._ds, start_time=t-t_incr, end_time=t)
+ ntiles = len(nexus_tiles)
+ print 'find_global_tile_set got %d tiles' % ntiles
+ sys.stdout.flush()
+ t -= t_incr
+ return nexus_tiles
+
+ def _prune_tiles(self, nexus_tiles):
+ del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0]
+ for i in np.flipud(del_ind):
+ del nexus_tiles[i]
+
+ #@staticmethod
+ #def _map(tile_in):
+ def _map(self, tile_in):
+ print 'Started tile %s' % tile_in.section_spec
+ print 'tile lats = ', tile_in.latitudes
+ print 'tile lons = ', tile_in.longitudes
+ print 'tile = ', tile_in.data
+ sys.stdout.flush()
+ lats = tile_in.latitudes
+ lons = tile_in.longitudes
+ if len(lats > 0) and len(lons > 0):
+ min_lat = np.ma.min(lats)
+ max_lat = np.ma.max(lats)
+ min_lon = np.ma.min(lons)
+ max_lon = np.ma.max(lons)
+ good_inds_lat = np.where(lats.mask == False)
+ good_inds_lon = np.where(lons.mask == False)
+ min_y = np.min(good_inds_lat)
+ max_y = np.max(good_inds_lat)
+ min_x = np.min(good_inds_lon)
+ max_x = np.max(good_inds_lon)
+ tile_inbounds_shape = (max_y-min_y+1, max_x-min_x+1)
+ days_at_a_time = 90
+ t_incr = 86400 * days_at_a_time
+ avg_tile = np.ma.array(np.zeros(tile_inbounds_shape,
+ dtype=np.float64))
+ cnt_tile = np.ma.array(np.zeros(tile_inbounds_shape,
+ dtype=np.uint32))
+ t_start = self._startTime
+ while t_start <= self._endTime:
+ t_end = min(t_start+t_incr,self._endTime)
+ t1 = time()
+ print 'nexus call start at time %f' % t1
+ sys.stdout.flush()
+ nexus_tiles = self._tile_service.get_tiles_bounded_by_box(min_lat-self._latRes/2, max_lat+self._latRes/2, min_lon-self._lonRes/2, max_lon+self._lonRes/2, ds=self._ds, start_time=t_start, end_time=t_end)
+ t2 = time()
+ print 'nexus call end at time %f' % t2
+ print 'secs in nexus call: ', t2-t1
+ sys.stdout.flush()
+ self._prune_tiles(nexus_tiles)
+ print 't %d to %d - Got %d tiles' % (t_start, t_end,
+ len(nexus_tiles))
+ sys.stdout.flush()
+ for tile in nexus_tiles:
+ tile.data.data[:,:] = np.nan_to_num(tile.data.data)
+ avg_tile.data[:,:] += tile.data[0,
+ min_y:max_y+1,
+ min_x:max_x+1]
+ cnt_tile.data[:,:] += (~tile.data.mask[0,
+ min_y:max_y+1,
+ min_x:max_x+1]).astype(np.uint8)
+ t_start = t_end + 1
+
+ print 'cnt_tile = ', cnt_tile
+ cnt_tile.mask = ~(cnt_tile.data.astype(bool))
+ avg_tile.mask = cnt_tile.mask
+ avg_tile /= cnt_tile
+ print 'Finished tile %s' % tile_in.section_spec
+ print 'Tile avg = ', avg_tile
+ sys.stdout.flush()
+ else:
+ avg_tile = None
+ min_lat = None
+ max_lat = None
+ min_lon = None
+ max_lon = None
+ print 'Tile %s outside of bounding box' % tile_in.section_spec
+ sys.stdout.flush()
+ return (avg_tile,min_lat,max_lat,min_lon,max_lon)
+
+ def _lat2ind(self,lat):
+ return int((lat-self._minLatCent)/self._latRes)
+
+ def _lon2ind(self,lon):
+ return int((lon-self._minLonCent)/self._lonRes)
+
+ def _create_nc_file(self, a):
+ print 'a=',a
+ print 'shape a = ', a.shape
+ sys.stdout.flush()
+ lat_dim, lon_dim = a.shape
+ rootgrp = Dataset("tam.nc", "w", format="NETCDF4")
+ rootgrp.createDimension("lat", lat_dim)
+ rootgrp.createDimension("lon", lon_dim)
+ rootgrp.createVariable("TRMM_3B42_daily_precipitation_V7", "f4",
+ dimensions=("lat","lon",))
+ rootgrp.createVariable("lat", "f4", dimensions=("lat",))
+ rootgrp.createVariable("lon", "f4", dimensions=("lon",))
+ rootgrp.variables["TRMM_3B42_daily_precipitation_V7"][:,:] = a
+ rootgrp.variables["lat"][:] = np.linspace(self._minLatCent,
+ self._maxLatCent, lat_dim)
+ rootgrp.variables["lon"][:] = np.linspace(self._minLonCent,
+ self._maxLonCent, lon_dim)
+ rootgrp.close()
+
+ def calc(self, computeOptions, **args):
+ """
+
+ :param computeOptions: StatsComputeOptions
+ :param args: dict
+ :return:
+ """
+
+ self._minLat = float(computeOptions.get_min_lat())
+ self._maxLat = float(computeOptions.get_max_lat())
+ self._minLon = float(computeOptions.get_min_lon())
+ self._maxLon = float(computeOptions.get_max_lon())
+ self._ds = computeOptions.get_dataset()[0]
+ self._startTime = computeOptions.get_start_time()
+ self._endTime = computeOptions.get_end_time()
+
+ self._find_native_resolution()
+ print 'Using Native resolution: lat_res=%f, lon_res=%f' % (self._latRes, self._lonRes)
+ self._minLatCent = self._minLat + self._latRes / 2
+ self._minLonCent = self._minLon + self._lonRes / 2
+ nlats = int((self._maxLat-self._minLatCent)/self._latRes)+1
+ nlons = int((self._maxLon-self._minLonCent)/self._lonRes)+1
+ self._maxLatCent = self._minLatCent + (nlats-1) * self._latRes
+ self._maxLonCent = self._minLonCent + (nlons-1) * self._lonRes
+ print 'nlats=',nlats,'nlons=',nlons
+ print 'center lat range = %f to %f' % (self._minLatCent,
+ self._maxLatCent)
+ print 'center lon range = %f to %f' % (self._minLonCent,
+ self._maxLonCent)
+ sys.stdout.flush()
+ a = np.zeros((nlats, nlons),dtype=np.float64,order='C')
+
+ nexus_tiles = self._find_global_tile_set()
+ # print 'tiles:'
+ # for tile in nexus_tiles:
+ # print tile.granule
+ # print tile.section_spec
+ # print 'lat:', tile.latitudes
+ # print 'lon:', tile.longitudes
+
+ # nexus_tiles)
+ if len(nexus_tiles) == 0:
+ raise NoDataException(reason="No data found for selected timeframe")
+
+ print 'Initially found %d tiles' % len(nexus_tiles)
+ sys.stdout.flush()
+ self._prune_tiles(nexus_tiles)
+ print 'Pruned to %d tiles' % len(nexus_tiles)
+ sys.stdout.flush()
+ #for tile in nexus_tiles:
+ # print 'lats: ', tile.latitudes.compressed()
+ # print 'lons: ', tile.longitudes.compressed()
+
+ avg_tiles = map(self._map, nexus_tiles)
+ print 'shape a = ', a.shape
+ sys.stdout.flush()
+ # The tiles below are NOT Nexus objects. They are tuples
+ # with the time avg map data and lat-lon bounding box.
+ for tile in avg_tiles:
+ if tile is not None:
+ (tile_data, tile_min_lat, tile_max_lat,
+ tile_min_lon, tile_max_lon) = tile
+ print 'shape tile_data = ', tile_data.shape
+ print 'tile data mask = ', tile_data.mask
+ sys.stdout.flush()
+ if np.any(np.logical_not(tile_data.mask)):
+ y0 = self._lat2ind(tile_min_lat)
+ y1 = self._lat2ind(tile_max_lat)
+ x0 = self._lon2ind(tile_min_lon)
+ x1 = self._lon2ind(tile_max_lon)
+ print 'writing tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \
+ (tile_min_lat, tile_max_lat,
+ tile_min_lon, tile_max_lon, y0, y1, x0, x1)
+ sys.stdout.flush()
+ a[y0:y1+1,x0:x1+1] = tile_data
+ else:
+ print 'All pixels masked in tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \
+ (tile_min_lat, tile_max_lat,
+ tile_min_lon, tile_max_lon, y0, y1, x0, x1)
+ sys.stdout.flush()
+
+ self._create_nc_file(a)
+
+ return TimeAvgMapResults(results={}, meta={}, computeOptions=computeOptions)
+
+
+class TimeAvgMapResults(NexusResults):
+
+ def __init__(self, results=None, meta=None, computeOptions=None):
+ NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeSeries.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py
new file mode 100644
index 0000000..858fea4
--- /dev/null
+++ b/analysis/webservice/algorithms/TimeSeries.py
@@ -0,0 +1,549 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import calendar
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.dates as mdates
+import matplotlib.pyplot as plt
+import numpy as np
+import pytz
+import shapely.geometry
+import shapely.wkt
+from backports.functools_lru_cache import lru_cache
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
+from scipy import stats
+
+from webservice import Filtering as filtering
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+SENTINEL = 'STOP'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class TimeSeriesHandlerImpl(NexusHandler):
+ name = "Time Series"
+ path = "/stats"
+ description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range"
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "comma-delimited string",
+ "description": "The dataset(s) Used to generate the Time Series. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "seasonalFilter": {
+ "name": "Compute Seasonal Cycle Filter",
+ "type": "boolean",
+ "description": "Flag used to specify if the seasonal averages should be computed during "
+ "Time Series computation. Optional (Default: True)"
+ },
+ "lowPassFilter": {
+ "name": "Compute Low Pass Filter",
+ "type": "boolean",
+ "description": "Flag used to specify if a low pass filter should be computed during "
+ "Time Series computation. Optional (Default: True)"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ try:
+ ds = request.get_dataset()
+ if type(ds) != list and type(ds) != tuple:
+ ds = (ds,)
+ except:
+ raise NexusProcessingException(
+ reason="'ds' argument is required. Must be comma-delimited string",
+ code=400)
+
+ # Do not allow time series on Climatology
+ if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+ raise NexusProcessingException(reason="Cannot compute time series on a climatology", code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ request.get_min_lon = lambda: bounding_polygon.bounds[0]
+ request.get_min_lat = lambda: bounding_polygon.bounds[1]
+ request.get_max_lon = lambda: bounding_polygon.bounds[2]
+ request.get_max_lat = lambda: bounding_polygon.bounds[3]
+ except:
+ try:
+ west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
+ request.get_max_lon(), request.get_max_lat()
+ bounding_polygon = shapely.geometry.Polygon(
+ [(west, south), (east, south), (east, north), (west, north), (west, south)])
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as "
+ "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or "
+ "string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or "
+ "string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ apply_seasonal_cycle_filter = request.get_apply_seasonal_cycle_filter()
+ apply_low_pass_filter = request.get_apply_low_pass_filter()
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+ apply_seasonal_cycle_filter, apply_low_pass_filter
+
+ def calc(self, request, **args):
+ """
+
+ :param request: StatsComputeOptions
+ :param args: dict
+ :return:
+ """
+
+ ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+ apply_seasonal_cycle_filter, apply_low_pass_filter = self.parse_arguments(request)
+
+ resultsRaw = []
+
+ for shortName in ds:
+ results, meta = self.getTimeSeriesStatsForBoxSingleDataSet(bounding_polygon,
+ shortName,
+ start_seconds_from_epoch,
+ end_seconds_from_epoch,
+ apply_seasonal_cycle_filter=apply_seasonal_cycle_filter,
+ apply_low_pass_filter=apply_low_pass_filter)
+ resultsRaw.append([results, meta])
+
+ the_time = datetime.now()
+ results = self._mergeResults(resultsRaw)
+
+ if len(ds) == 2:
+ try:
+ stats = TimeSeriesHandlerImpl.calculate_comparison_stats(results)
+ except Exception:
+ stats = {}
+ tb = traceback.format_exc()
+ self.log.warn("Error when calculating comparison stats:\n%s" % tb)
+ else:
+ stats = {}
+
+ meta = []
+ for singleRes in resultsRaw:
+ meta.append(singleRes[1])
+
+ res = TimeSeriesResults(results=results, meta=meta, stats=stats,
+ computeOptions=None, minLat=bounding_polygon.bounds[1],
+ maxLat=bounding_polygon.bounds[3], minLon=bounding_polygon.bounds[0],
+ maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
+ endTime=end_seconds_from_epoch)
+
+ self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
+ return res
+
+ def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch,
+ end_seconds_from_epoch,
+ apply_seasonal_cycle_filter=True, apply_low_pass_filter=True):
+
+ the_time = datetime.now()
+ daysinrange = self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1],
+ bounding_polygon.bounds[3],
+ bounding_polygon.bounds[0],
+ bounding_polygon.bounds[2],
+ ds,
+ start_seconds_from_epoch,
+ end_seconds_from_epoch)
+ self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+ if len(daysinrange) == 0:
+ raise NoDataException(reason="No data found for selected timeframe")
+
+ the_time = datetime.now()
+ maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+ results = []
+ if maxprocesses == 1:
+ calculator = TimeSeriesCalculator()
+ for dayinseconds in daysinrange:
+ result = calculator.calc_average_on_day(bounding_polygon.wkt, ds, dayinseconds)
+ results += [result] if result else []
+ else:
+ # Create a task to calc average difference for each day
+ manager = Manager()
+ work_queue = manager.Queue()
+ done_queue = manager.Queue()
+ for dayinseconds in daysinrange:
+ work_queue.put(
+ ('calc_average_on_day', bounding_polygon.wkt, ds, dayinseconds))
+ [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+ # Start new processes to handle the work
+ pool = Pool(maxprocesses)
+ [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+ pool.close()
+
+ # Collect the results as [(day (in ms), average difference for that day)]
+ for i in xrange(0, len(daysinrange)):
+ result = done_queue.get()
+ try:
+ error_str = result['error']
+ self.log.error(error_str)
+ raise NexusProcessingException(reason="Error calculating average by day.")
+ except KeyError:
+ pass
+
+ results += [result] if result else []
+
+ pool.terminate()
+ manager.shutdown()
+
+ results = sorted(results, key=lambda entry: entry["time"])
+ self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+ if apply_seasonal_cycle_filter:
+ the_time = datetime.now()
+ for result in results:
+ month = datetime.utcfromtimestamp(result['time']).month
+ month_mean, month_max, month_min = self.calculate_monthly_average(month, bounding_polygon.wkt, ds)
+ seasonal_mean = result['mean'] - month_mean
+ seasonal_min = result['min'] - month_min
+ seasonal_max = result['max'] - month_max
+ result['meanSeasonal'] = seasonal_mean
+ result['minSeasonal'] = seasonal_min
+ result['maxSeasonal'] = seasonal_max
+ self.log.info(
+ "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+ the_time = datetime.now()
+ filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter)
+ filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter)
+ filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter)
+
+ if apply_seasonal_cycle_filter and apply_low_pass_filter:
+ try:
+ filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True,
+ append="LowPass")
+ filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True,
+ append="LowPass")
+ filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True,
+ append="LowPass")
+ except Exception as e:
+ # If it doesn't work log the error but ignore it
+ tb = traceback.format_exc()
+ self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
+
+ self.log.info(
+ "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+ return results, {}
+
+ @lru_cache()
+ def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None):
+
+ min_date, max_date = self.get_min_max_date(ds=ds)
+
+ monthly_averages, monthly_counts = [], []
+ monthly_mins, monthly_maxes = [], []
+ bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
+ for year in range(min_date.year, max_date.year + 1):
+ beginning_of_month = datetime(year, month, 1)
+ end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59)
+ start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds()
+ end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
+ tile_stats = self._tile_service.find_tiles_in_polygon(bounding_polygon, ds, start, end,
+ fl=('id,'
+ 'tile_avg_val_d,tile_count_i,'
+ 'tile_min_val_d,tile_max_val_d,'
+ 'tile_min_lat,tile_max_lat,'
+ 'tile_min_lon,tile_max_lon'),
+ fetch_data=False)
+ if len(tile_stats) == 0:
+ continue
+
+ # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box.
+ border_tiles, inner_tiles = [], []
+ for tile in tile_stats:
+ inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon,
+ tile.bbox.min_lat,
+ tile.bbox.max_lon,
+ tile.bbox.max_lat)) else border_tiles.append(
+ tile)
+
+ # We can use the stats of the inner tiles directly
+ tile_means = [tile.tile_stats.mean for tile in inner_tiles]
+ tile_mins = [tile.tile_stats.min for tile in inner_tiles]
+ tile_maxes = [tile.tile_stats.max for tile in inner_tiles]
+ tile_counts = [tile.tile_stats.count for tile in inner_tiles]
+
+ # Border tiles need have the data loaded, masked, and stats recalculated
+ border_tiles = list(self._tile_service.fetch_data_for_tiles(*border_tiles))
+ border_tiles = self._tile_service.mask_tiles_to_polygon(bounding_polygon, border_tiles)
+ for tile in border_tiles:
+ tile.update_stats()
+ tile_means.append(tile.tile_stats.mean)
+ tile_mins.append(tile.tile_stats.min)
+ tile_maxes.append(tile.tile_stats.max)
+ tile_counts.append(tile.tile_stats.count)
+
+ tile_means = np.array(tile_means)
+ tile_mins = np.array(tile_mins)
+ tile_maxes = np.array(tile_maxes)
+ tile_counts = np.array(tile_counts)
+
+ sum_tile_counts = np.sum(tile_counts) * 1.0
+
+ monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()]
+ monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()]
+ monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()]
+ monthly_counts += [sum_tile_counts]
+
+ count_sum = np.sum(monthly_counts) * 1.0
+ weights = np.array(monthly_counts) / count_sum
+
+ return np.average(monthly_averages, None, weights).item(), \
+ np.average(monthly_averages, None, weights).item(), \
+ np.average(monthly_averages, None, weights).item()
+
+ @lru_cache()
+ def get_min_max_date(self, ds=None):
+ min_date = pytz.timezone('UTC').localize(
+ datetime.utcfromtimestamp(self._tile_service.get_min_time([], ds=ds)))
+ max_date = pytz.timezone('UTC').localize(
+ datetime.utcfromtimestamp(self._tile_service.get_max_time([], ds=ds)))
+
+ return min_date.date(), max_date.date()
+
+ @staticmethod
+ def calculate_comparison_stats(results):
+ xy = [[], []]
+
+ for item in results:
+ if len(item) == 2:
+ xy[item[0]["ds"]].append(item[0]["mean"])
+ xy[item[1]["ds"]].append(item[1]["mean"])
+
+ slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1])
+ comparisonStats = {
+ "slope": slope,
+ "intercept": intercept,
+ "r": r_value,
+ "p": p_value,
+ "err": std_err
+ }
+
+ return comparisonStats
+
+
+class TimeSeriesResults(NexusResults):
+ LINE_PLOT = "line"
+ SCATTER_PLOT = "scatter"
+
+ __SERIES_COLORS = ['red', 'blue']
+
+ def toImage(self):
+
+ type = self.computeOptions().get_plot_type()
+
+ if type == TimeSeriesResults.LINE_PLOT or type == "default":
+ return self.createLinePlot()
+ elif type == TimeSeriesResults.SCATTER_PLOT:
+ return self.createScatterPlot()
+ else:
+ raise Exception("Invalid or unsupported time series plot specified")
+
+ def createScatterPlot(self):
+ timeSeries = []
+ series0 = []
+ series1 = []
+
+ res = self.results()
+ meta = self.meta()
+
+ plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "mean"
+
+ for m in res:
+ if len(m) == 2:
+ timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000))
+ series0.append(m[0][plotSeries])
+ series1.append(m[1][plotSeries])
+
+ title = ', '.join(set([m['title'] for m in meta]))
+ sources = ', '.join(set([m['source'] for m in meta]))
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ fig, ax = plt.subplots()
+ fig.set_size_inches(11.0, 8.5)
+ ax.scatter(series0, series1, alpha=0.5)
+ ax.set_xlabel(meta[0]['units'])
+ ax.set_ylabel(meta[1]['units'])
+ ax.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+
+ par = np.polyfit(series0, series1, 1, full=True)
+ slope = par[0][0]
+ intercept = par[0][1]
+ xl = [min(series0), max(series0)]
+ yl = [slope * xx + intercept for xx in xl]
+ plt.plot(xl, yl, '-r')
+
+ ax.grid(True)
+ fig.tight_layout()
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+ def createLinePlot(self):
+ nseries = len(self.meta())
+ res = self.results()
+ meta = self.meta()
+
+ timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res]
+
+ means = [[np.nan] * len(res) for n in range(0, nseries)]
+
+ plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "mean"
+
+ for n in range(0, len(res)):
+ timeSlot = res[n]
+ for seriesValues in timeSlot:
+ means[seriesValues['ds']][n] = seriesValues[plotSeries]
+
+ x = timeSeries
+
+ fig, axMain = plt.subplots()
+ fig.set_size_inches(11.0, 8.5)
+ fig.autofmt_xdate()
+
+ title = ', '.join(set([m['title'] for m in meta]))
+ sources = ', '.join(set([m['source'] for m in meta]))
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+ axMain.set_xlabel('Date')
+ axMain.grid(True)
+ axMain.xaxis.set_major_locator(mdates.YearLocator())
+ axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
+ axMain.xaxis.set_minor_locator(mdates.MonthLocator())
+ axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d')
+
+ plots = []
+
+ for n in range(0, nseries):
+ if n == 0:
+ ax = axMain
+ else:
+ ax = ax.twinx()
+
+ plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title'])
+ ax.set_ylabel(meta[n]['units'])
+
+ labs = [l.get_label() for l in plots]
+ axMain.legend(plots, labs, loc=0)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+
+class TimeSeriesCalculator(object):
+ def __init__(self):
+ self.__tile_service = NexusTileService()
+
+ def calc_average_on_day(self, bounding_polygon_wkt, dataset, timeinseconds):
+ bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
+ ds1_nexus_tiles = self.__tile_service.get_tiles_bounded_by_polygon_at_time(bounding_polygon,
+ dataset,
+ timeinseconds)
+
+ # If all data ends up getting masked, ds1_nexus_tiles will be empty
+ if len(ds1_nexus_tiles) == 0:
+ return {}
+
+ tile_data_agg = np.ma.array([tile.data for tile in ds1_nexus_tiles])
+ data_min = np.ma.min(tile_data_agg)
+ data_max = np.ma.max(tile_data_agg)
+ daily_mean = np.ma.mean(tile_data_agg).item()
+ data_count = np.ma.count(tile_data_agg)
+ try:
+ data_count = data_count.item()
+ except AttributeError:
+ pass
+ data_std = np.ma.std(tile_data_agg)
+
+ # Return Stats by day
+ stat = {
+ 'min': data_min,
+ 'max': data_max,
+ 'mean': daily_mean,
+ 'cnt': data_count,
+ 'std': data_std,
+ 'time': int(timeinseconds)
+ }
+ return stat
+
+
+def pool_worker(work_queue, done_queue):
+ try:
+ calculator = TimeSeriesCalculator()
+
+ for work in iter(work_queue.get, SENTINEL):
+ scifunction = work[0]
+ args = work[1:]
+ result = calculator.__getattribute__(scifunction)(*args)
+ done_queue.put(result)
+
+ except Exception as e:
+ e_str = traceback.format_exc(e)
+ done_queue.put({'error': e_str})
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeSeriesSolr.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py
new file mode 100644
index 0000000..1da55b2
--- /dev/null
+++ b/analysis/webservice/algorithms/TimeSeriesSolr.py
@@ -0,0 +1,342 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import sys
+import traceback
+import logging
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.dates as mdates
+import matplotlib.pyplot as plt
+import numpy as np
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from nexustiles.nexustiles import NexusTileService
+from scipy import stats
+
+from webservice import Filtering as filt
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+SENTINEL = 'STOP'
+
+
+@nexus_handler
+class TimeSeriesHandlerImpl(NexusHandler):
+ name = "Time Series Solr"
+ path = "/statsSolr"
+ description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=True)
+ self.log = logging.getLogger(__name__)
+
+ def calc(self, computeOptions, **args):
+ """
+
+ :param computeOptions: StatsComputeOptions
+ :param args: dict
+ :return:
+ """
+
+ ds = computeOptions.get_dataset()
+
+ if type(ds) != list and type(ds) != tuple:
+ ds = (ds,)
+
+ resultsRaw = []
+
+ for shortName in ds:
+ results, meta = self.getTimeSeriesStatsForBoxSingleDataSet(computeOptions.get_min_lat(),
+ computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(),
+ computeOptions.get_max_lon(),
+ shortName,
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time(),
+ computeOptions.get_apply_seasonal_cycle_filter(),
+ computeOptions.get_apply_low_pass_filter())
+ resultsRaw.append([results, meta])
+
+ results = self._mergeResults(resultsRaw)
+
+ if len(ds) == 2:
+ stats = self.calculateComparisonStats(results, suffix="")
+ if computeOptions.get_apply_seasonal_cycle_filter():
+ s = self.calculateComparisonStats(results, suffix="Seasonal")
+ stats = self._mergeDicts(stats, s)
+ if computeOptions.get_apply_low_pass_filter():
+ s = self.calculateComparisonStats(results, suffix="LowPass")
+ stats = self._mergeDicts(stats, s)
+ if computeOptions.get_apply_seasonal_cycle_filter() and computeOptions.get_apply_low_pass_filter():
+ s = self.calculateComparisonStats(results, suffix="SeasonalLowPass")
+ stats = self._mergeDicts(stats, s)
+ else:
+ stats = {}
+
+ meta = []
+ for singleRes in resultsRaw:
+ meta.append(singleRes[1])
+
+ res = TimeSeriesResults(results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+ return res
+
+ def getTimeSeriesStatsForBoxSingleDataSet(self, min_lat, max_lat, min_lon, max_lon, ds, start_time=0, end_time=-1,
+ applySeasonalFilter=True, applyLowPass=True):
+
+ daysinrange = self._tile_service.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
+ end_time)
+
+ if len(daysinrange) == 0:
+ raise NoDataException(reason="No data found for selected timeframe")
+
+ maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+ results = []
+ if maxprocesses == 1:
+ calculator = TimeSeriesCalculator()
+ for dayinseconds in daysinrange:
+ result = calculator.calc_average_on_day(min_lat, max_lat, min_lon, max_lon, ds, dayinseconds)
+ results.append(result)
+ else:
+ # Create a task to calc average difference for each day
+ manager = Manager()
+ work_queue = manager.Queue()
+ done_queue = manager.Queue()
+ for dayinseconds in daysinrange:
+ work_queue.put(
+ ('calc_average_on_day', min_lat, max_lat, min_lon, max_lon, ds, dayinseconds))
+ [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+ # Start new processes to handle the work
+ pool = Pool(maxprocesses)
+ [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+ pool.close()
+
+ # Collect the results as [(day (in ms), average difference for that day)]
+ for i in xrange(0, len(daysinrange)):
+ result = done_queue.get()
+ try:
+ error_str = result['error']
+ self.log.error(error_str)
+ raise NexusProcessingException(reason="Error calculating average by day.")
+ except KeyError:
+ pass
+
+ results.append(result)
+
+ pool.terminate()
+ manager.shutdown()
+
+ results = sorted(results, key=lambda entry: entry["time"])
+
+ filt.applyAllFiltersOnField(results, 'mean', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
+ filt.applyAllFiltersOnField(results, 'max', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
+ filt.applyAllFiltersOnField(results, 'min', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
+
+ return results, {}
+
+ def calculateComparisonStats(self, results, suffix=""):
+
+ xy = [[], []]
+
+ for item in results:
+ if len(item) == 2:
+ xy[item[0]["ds"]].append(item[0]["mean%s" % suffix])
+ xy[item[1]["ds"]].append(item[1]["mean%s" % suffix])
+
+ slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1])
+ comparisonStats = {
+ "slope%s" % suffix: slope,
+ "intercept%s" % suffix: intercept,
+ "r%s" % suffix: r_value,
+ "p%s" % suffix: p_value,
+ "err%s" % suffix: std_err
+ }
+
+ return comparisonStats
+
+
+class TimeSeriesResults(NexusResults):
+ LINE_PLOT = "line"
+ SCATTER_PLOT = "scatter"
+
+ __SERIES_COLORS = ['red', 'blue']
+
+ def __init__(self, results=None, meta=None, stats=None, computeOptions=None):
+ NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+
+ def toImage(self):
+
+ type = self.computeOptions().get_plot_type()
+
+ if type == TimeSeriesResults.LINE_PLOT or type == "default":
+ return self.createLinePlot()
+ elif type == TimeSeriesResults.SCATTER_PLOT:
+ return self.createScatterPlot()
+ else:
+ raise Exception("Invalid or unsupported time series plot specified")
+
+ def createScatterPlot(self):
+ timeSeries = []
+ series0 = []
+ series1 = []
+
+ res = self.results()
+ meta = self.meta()
+
+ plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "mean"
+
+ for m in res:
+ if len(m) == 2:
+ timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000))
+ series0.append(m[0][plotSeries])
+ series1.append(m[1][plotSeries])
+
+ title = ', '.join(set([m['title'] for m in meta]))
+ sources = ', '.join(set([m['source'] for m in meta]))
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ fig, ax = plt.subplots()
+ fig.set_size_inches(11.0, 8.5)
+ ax.scatter(series0, series1, alpha=0.5)
+ ax.set_xlabel(meta[0]['units'])
+ ax.set_ylabel(meta[1]['units'])
+ ax.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+
+ par = np.polyfit(series0, series1, 1, full=True)
+ slope = par[0][0]
+ intercept = par[0][1]
+ xl = [min(series0), max(series0)]
+ yl = [slope * xx + intercept for xx in xl]
+ plt.plot(xl, yl, '-r')
+
+ # r = self.stats()["r"]
+ # plt.text(0.5, 0.5, "r = foo")
+
+ ax.grid(True)
+ fig.tight_layout()
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+ def createLinePlot(self):
+ nseries = len(self.meta())
+ res = self.results()
+ meta = self.meta()
+
+ timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res]
+
+ means = [[np.nan] * len(res) for n in range(0, nseries)]
+
+ plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "mean"
+
+ for n in range(0, len(res)):
+ timeSlot = res[n]
+ for seriesValues in timeSlot:
+ means[seriesValues['ds']][n] = seriesValues[plotSeries]
+
+ x = timeSeries
+
+ fig, axMain = plt.subplots()
+ fig.set_size_inches(11.0, 8.5)
+ fig.autofmt_xdate()
+
+ title = ', '.join(set([m['title'] for m in meta]))
+ sources = ', '.join(set([m['source'] for m in meta]))
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+ axMain.set_xlabel('Date')
+ axMain.grid(True)
+ axMain.xaxis.set_major_locator(mdates.YearLocator())
+ axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
+ axMain.xaxis.set_minor_locator(mdates.MonthLocator())
+ axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d')
+
+ plots = []
+
+ for n in range(0, nseries):
+ if n == 0:
+ ax = axMain
+ else:
+ ax = ax.twinx()
+
+ plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title'])
+ ax.set_ylabel(meta[n]['units'])
+
+ labs = [l.get_label() for l in plots]
+ axMain.legend(plots, labs, loc=0)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+
+class TimeSeriesCalculator(object):
+ def __init__(self):
+ self.__tile_service = NexusTileService()
+
+ def calc_average_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset, timeinseconds):
+ # Get stats using solr only
+ ds1_nexus_tiles_stats = self.__tile_service.get_stats_within_box_at_time(min_lat, max_lat, min_lon, max_lon,
+ dataset,
+ timeinseconds)
+
+ data_min_within = min([tile["tile_min_val_d"] for tile in ds1_nexus_tiles_stats])
+ data_max_within = max([tile["tile_max_val_d"] for tile in ds1_nexus_tiles_stats])
+ data_sum_within = sum([tile["product(tile_avg_val_d, tile_count_i)"] for tile in ds1_nexus_tiles_stats])
+ data_count_within = sum([tile["tile_count_i"] for tile in ds1_nexus_tiles_stats])
+
+ # Get boundary tiles and calculate stats
+ ds1_nexus_tiles = self.__tile_service.get_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon,
+ dataset,
+ timeinseconds)
+
+ tile_data_agg = np.ma.array([tile.data for tile in ds1_nexus_tiles])
+ data_min_boundary = np.ma.min(tile_data_agg)
+ data_max_boundary = np.ma.max(tile_data_agg)
+ #daily_mean = np.ma.mean(tile_data_agg).item()
+ data_sum_boundary = np.ma.sum(tile_data_agg)
+ data_count_boundary = np.ma.count(tile_data_agg).item()
+ #data_std = np.ma.std(tile_data_agg)
+
+ # Combine stats
+ data_min = min(data_min_within, data_min_boundary)
+ data_max = max(data_max_within, data_max_boundary)
+ data_count = data_count_within + data_count_boundary
+ daily_mean = (data_sum_within + data_sum_boundary) / data_count
+ data_std = 0
+
+ # Return Stats by day
+ stat = {
+ 'min': data_min,
+ 'max': data_max,
+ 'mean': daily_mean,
+ 'cnt': data_count,
+ 'std': data_std,
+ 'time': int(timeinseconds)
+ }
+ return stat
+
+
+def pool_worker(work_queue, done_queue):
+ try:
+ calculator = TimeSeriesCalculator()
+
+ for work in iter(work_queue.get, SENTINEL):
+ scifunction = work[0]
+ args = work[1:]
+ result = calculator.__getattribute__(scifunction)(*args)
+ done_queue.put(result)
+ except Exception as e:
+ e_str = traceback.format_exc(e)
+ done_queue.put({'error': e_str})
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/__init__.py b/analysis/webservice/algorithms/__init__.py
new file mode 100644
index 0000000..fb5ecc8
--- /dev/null
+++ b/analysis/webservice/algorithms/__init__.py
@@ -0,0 +1,20 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import Capabilities
+import CorrelationMap
+import DailyDifferenceAverage
+import DataInBoundsSearch
+import DataSeriesList
+import DelayTest
+import ErrorTosserTest
+import Heartbeat
+import HofMoeller
+import LongitudeLatitudeMap
+import TestInitializer
+import TileSearch
+import TimeAvgMap
+import TimeSeries
+import TimeSeriesSolr
+import StandardDeviationSearch
\ No newline at end of file