You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:26 UTC

[49/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/StandardDeviationSearch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py
new file mode 100644
index 0000000..002a45d
--- /dev/null
+++ b/analysis/webservice/algorithms/StandardDeviationSearch.py
@@ -0,0 +1,191 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import json
+import logging
+from datetime import datetime
+from functools import partial
+
+from nexustiles.nexustiles import NexusTileServiceException
+from pytz import timezone
+
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusProcessingException, CustomEncoder
+
+SENTINEL = 'STOP'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+
+
+@nexus_handler
+class StandardDeviationSearchHandlerImpl(NexusHandler):
+    name = "Standard Deviation Search"
+    path = "/standardDeviation"
+    description = "Retrieves the pixel standard deviation if it exists for a given longitude and latitude"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "string",
+            "description": "One or more comma-separated dataset shortnames. Required."
+        },
+        "longitude": {
+            "name": "Longitude",
+            "type": "float",
+            "description": "Longitude in degrees from -180 to 180. Required."
+        },
+        "latitude": {
+            "name": "Latitude",
+            "type": "float",
+            "description": "Latitude in degrees from -90 to 90. Required."
+        },
+        "day": {
+            "name": "Day of Year",
+            "type": "int",
+            "description": "Day of year to search from 0 to 365. One of day or date are required but not both."
+        },
+        "date": {
+            "name": "Date",
+            "type": "string",
+            "description": "Datetime in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970). One of day "
+                           "or date are required but not both."
+        },
+        "allInTile": {
+            "name": "Get all Standard Deviations in Tile",
+            "type": "boolean",
+            "description": "Optional True/False flag. If true, return the standard deviations for every pixel in the "
+                           "tile that contains the searched lon/lat point. If false, return the "
+                           "standard deviation only for the searched lon/lat point. Default: True"
+        }
+
+    }
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+        try:
+            ds = request.get_dataset()[0]
+        except:
+            raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+        try:
+            longitude = float(request.get_decimal_arg("longitude", default=None))
+        except:
+            raise NexusProcessingException(reason="'longitude' argument is required", code=400)
+
+        try:
+            latitude = float(request.get_decimal_arg("latitude", default=None))
+        except:
+            raise NexusProcessingException(reason="'latitude' argument is required", code=400)
+
+        search_datetime = request.get_datetime_arg('date', default=None)
+        day_of_year = request.get_int_arg('day', default=None)
+        if (search_datetime is not None and day_of_year is not None) \
+                or (search_datetime is None and day_of_year is None):
+            raise NexusProcessingException(
+                reason="At least one of 'day' or 'date' arguments are required but not both.",
+                code=400)
+
+        if search_datetime is not None:
+            day_of_year = search_datetime.timetuple().tm_yday
+
+        return_all = request.get_boolean_arg("allInTile", default=True)
+
+        return ds, longitude, latitude, day_of_year, return_all
+
+    def calc(self, request, **args):
+        raw_args_dict = {k: request.get_argument(k) for k in request.requestHandler.request.arguments}
+        ds, longitude, latitude, day_of_year, return_all = self.parse_arguments(request)
+
+        if return_all:
+            func = partial(get_all_std_dev, tile_service=self._tile_service, ds=ds, longitude=longitude,
+                           latitude=latitude, day_of_year=day_of_year)
+        else:
+            func = partial(get_single_std_dev, tile_service=self._tile_service, ds=ds, longitude=longitude,
+                           latitude=latitude, day_of_year=day_of_year)
+
+        try:
+            results = StandardDeviationSearchHandlerImpl.to_result_dict(func())
+        except (NoTileException, NoStandardDeviationException):
+            return StandardDeviationSearchResult(raw_args_dict, [])
+
+        return StandardDeviationSearchResult(raw_args_dict, results)
+
+    @staticmethod
+    def to_result_dict(list_of_tuples):
+        # list_of_tuples = [(lon, lat, st_dev)]
+        return [
+            {
+                "longitude": lon,
+                "latitude": lat,
+                "standard_deviation": st_dev
+            } for lon, lat, st_dev in list_of_tuples]
+
+
+class NoTileException(Exception):
+    pass
+
+
+class NoStandardDeviationException(Exception):
+    pass
+
+
+def find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year):
+    from shapely.geometry import Point
+    point = Point(longitude, latitude)
+
+    try:
+        tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year(point, ds, day_of_year)[0]
+    except NexusTileServiceException:
+        raise NoTileException
+
+    # Check if this tile has any meta data that ends with 'std'. If it doesn't, just return nothing.
+    try:
+        st_dev_meta_name = next(iter([key for key in tile.meta_data.keys() if key.endswith('std')]))
+    except StopIteration:
+        raise NoStandardDeviationException
+
+    return tile, st_dev_meta_name
+
+
+def get_single_std_dev(tile_service, ds, longitude, latitude, day_of_year):
+    from scipy.spatial import distance
+
+    tile, st_dev_meta_name = find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year)
+
+    # Need to find the closest point in the tile to the input lon/lat point and return only that result
+    valid_indices = tile.get_indices()
+    tile_points = [tuple([tile.longitudes[lon_idx], tile.latitudes[lat_idx]]) for time_idx, lat_idx, lon_idx in
+                   valid_indices]
+    closest_point_index = distance.cdist([(longitude, latitude)], tile_points).argmin()
+    closest_lon, closest_lat = tile_points[closest_point_index]
+    closest_point_tile_index = tuple(valid_indices[closest_point_index])
+    std_at_point = tile.meta_data[st_dev_meta_name][closest_point_tile_index]
+    return [tuple([closest_lon, closest_lat, std_at_point])]
+
+
+def get_all_std_dev(tile_service, ds, longitude, latitude, day_of_year):
+    tile, st_dev_meta_name = find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year)
+
+    valid_indices = tile.get_indices()
+    return [tuple([tile.longitudes[lon_idx], tile.latitudes[lat_idx],
+                   tile.meta_data[st_dev_meta_name][time_idx, lat_idx, lon_idx]]) for time_idx, lat_idx, lon_idx in
+            valid_indices]
+
+
+class StandardDeviationSearchResult(object):
+    def __init__(self, request_params, results):
+        self.request_params = request_params
+        self.results = results
+
+    def toJson(self):
+        data = {
+            'meta': self.request_params,
+            'data': self.results,
+            'stats': {}
+        }
+        return json.dumps(data, indent=4, cls=CustomEncoder)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TestInitializer.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TestInitializer.py b/analysis/webservice/algorithms/TestInitializer.py
new file mode 100644
index 0000000..3f47c6a
--- /dev/null
+++ b/analysis/webservice/algorithms/TestInitializer.py
@@ -0,0 +1,16 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+
+from webservice.NexusHandler import NexusHandler, nexus_initializer
+from nexustiles.nexustiles import NexusTileService
+
+@nexus_initializer
+class TestInitializer:
+
+    def __init__(self):
+        pass
+
+    def init(self, config):
+        print "*** TEST INITIALIZATION ***"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TileSearch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py
new file mode 100644
index 0000000..d1fade8
--- /dev/null
+++ b/analysis/webservice/algorithms/TileSearch.py
@@ -0,0 +1,71 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults
+
+
+# @nexus_handler
+class ChunkSearchHandlerImpl(NexusHandler):
+    name = "Data Tile Search"
+    path = "/tiles"
+    description = "Lists dataset tiles given a geographical area and time range"
+    singleton = True
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "string",
+            "description": "One dataset shortname"
+        },
+        "minLat": {
+            "name": "Minimum Latitude",
+            "type": "float",
+            "description": "Minimum (Southern) bounding box Latitude"
+        },
+        "maxLat": {
+            "name": "Maximum Latitude",
+            "type": "float",
+            "description": "Maximum (Northern) bounding box Latitude"
+        },
+        "minLon": {
+            "name": "Minimum Longitude",
+            "type": "float",
+            "description": "Minimum (Western) bounding box Longitude"
+        },
+        "maxLon": {
+            "name": "Maximum Longitude",
+            "type": "float",
+            "description": "Maximum (Eastern) bounding box Longitude"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "long integer",
+            "description": "Starting time in seconds since midnight Jan. 1st, 1970 UTC"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "long integer",
+            "description": "Ending time in seconds since midnight Jan. 1st, 1970 UTC"
+        }
+    }
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=True)
+
+    def calc(self, computeOptions, **args):
+        minLat = computeOptions.get_min_lat()
+        maxLat = computeOptions.get_max_lat()
+        minLon = computeOptions.get_min_lon()
+        maxLon = computeOptions.get_max_lon()
+        ds = computeOptions.get_dataset()[0]
+        startTime = computeOptions.get_start_time()
+        endTime = computeOptions.get_end_time()
+        # TODO update to expect tile objects back
+        res = [tile.get_summary() for tile in
+               self._tile_service.find_tiles_in_box(minLat, maxLat, minLon, maxLon, ds, startTime, endTime,
+                                                    fetch_data=False)]
+
+        res = NexusResults(results=res)
+        res.extendMeta(minLat, maxLat, minLon, maxLon, ds, startTime, endTime)
+        return res

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeAvgMap.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py
new file mode 100644
index 0000000..5c76604
--- /dev/null
+++ b/analysis/webservice/algorithms/TimeAvgMap.py
@@ -0,0 +1,265 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+# distutils: include_dirs = /usr/local/lib/python2.7/site-packages/cassandra
+import pyximport
+
+pyximport.install()
+
+import sys
+import numpy as np
+from time import time
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NoDataException
+from netCDF4 import Dataset
+
+#from mpl_toolkits.basemap import Basemap
+
+
+# @nexus_handler
+class TimeAvgMapHandlerImpl(NexusHandler):
+
+    name = "Time Average Map"
+    path = "/timeAvgMap"
+    description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=False)
+
+    def _find_native_resolution(self):
+        # Get a quick set of tiles (1 degree at center of box) at 1 time stamp
+        midLat = (self._minLat+self._maxLat)/2
+        midLon = (self._minLon+self._maxLon)/2
+        ntiles = 0
+        t = self._endTime
+        t_incr = 86400
+        while ntiles == 0:
+            nexus_tiles = self._tile_service.get_tiles_bounded_by_box(midLat-0.5, midLat+0.5, midLon-0.5, midLon+0.5, ds=self._ds, start_time=t-t_incr, end_time=t)
+            ntiles = len(nexus_tiles)
+            print 'find_native_res: got %d tiles' % len(nexus_tiles)
+            sys.stdout.flush()
+            lat_res = 0.
+            lon_res = 0.
+            if ntiles > 0:
+                for tile in nexus_tiles:
+                    if lat_res < 1e-10:
+                        lats = tile.latitudes.compressed()
+                        if (len(lats) > 1):
+                            lat_res = lats[1] - lats[0]
+                    if lon_res < 1e-10:
+                        lons = tile.longitudes.compressed()
+                        if (len(lons) > 1):
+                            lon_res = lons[1] - lons[0]
+                    if (lat_res >= 1e-10) and (lon_res >= 1e-10):
+                        break
+            if (lat_res < 1e-10) or (lon_res < 1e-10):
+                t -= t_incr
+
+        self._latRes = lat_res
+        self._lonRes = lon_res
+
+    def _find_global_tile_set(self):
+        ntiles = 0
+        t = self._endTime
+        t_incr = 86400
+        while ntiles == 0:
+            nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=self._ds, start_time=t-t_incr, end_time=t)
+            ntiles = len(nexus_tiles)
+            print 'find_global_tile_set got %d tiles' % ntiles
+            sys.stdout.flush()
+            t -= t_incr
+        return nexus_tiles
+
+    def _prune_tiles(self, nexus_tiles):
+        del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0]
+        for i in np.flipud(del_ind):
+            del nexus_tiles[i]
+
+    #@staticmethod
+    #def _map(tile_in):
+    def _map(self, tile_in):
+        print 'Started tile %s' % tile_in.section_spec
+        print 'tile lats = ', tile_in.latitudes
+        print 'tile lons = ', tile_in.longitudes
+        print 'tile = ', tile_in.data
+        sys.stdout.flush()
+        lats = tile_in.latitudes
+        lons = tile_in.longitudes
+        if len(lats > 0) and len(lons > 0):
+            min_lat = np.ma.min(lats)
+            max_lat = np.ma.max(lats)
+            min_lon = np.ma.min(lons)
+            max_lon = np.ma.max(lons)
+            good_inds_lat = np.where(lats.mask == False)
+            good_inds_lon = np.where(lons.mask == False)
+            min_y = np.min(good_inds_lat)
+            max_y = np.max(good_inds_lat)
+            min_x = np.min(good_inds_lon)
+            max_x = np.max(good_inds_lon)
+            tile_inbounds_shape = (max_y-min_y+1, max_x-min_x+1)
+            days_at_a_time = 90
+            t_incr = 86400 * days_at_a_time
+            avg_tile = np.ma.array(np.zeros(tile_inbounds_shape,
+                                            dtype=np.float64))
+            cnt_tile = np.ma.array(np.zeros(tile_inbounds_shape,
+                                            dtype=np.uint32))
+            t_start = self._startTime
+            while t_start <= self._endTime:
+                t_end = min(t_start+t_incr,self._endTime)
+                t1 = time()
+                print 'nexus call start at time %f' % t1
+                sys.stdout.flush()
+                nexus_tiles = self._tile_service.get_tiles_bounded_by_box(min_lat-self._latRes/2, max_lat+self._latRes/2, min_lon-self._lonRes/2, max_lon+self._lonRes/2, ds=self._ds, start_time=t_start, end_time=t_end)
+                t2 = time()
+                print 'nexus call end at time %f' % t2
+                print 'secs in nexus call: ', t2-t1
+                sys.stdout.flush()
+                self._prune_tiles(nexus_tiles)
+                print 't %d to %d - Got %d tiles' % (t_start, t_end, 
+                                                     len(nexus_tiles))
+                sys.stdout.flush()
+                for tile in nexus_tiles:
+                    tile.data.data[:,:] = np.nan_to_num(tile.data.data)
+                    avg_tile.data[:,:] += tile.data[0,
+                                                    min_y:max_y+1,
+                                                    min_x:max_x+1]
+                    cnt_tile.data[:,:] += (~tile.data.mask[0,
+                                                           min_y:max_y+1,
+                                                           min_x:max_x+1]).astype(np.uint8)
+                t_start = t_end + 1
+
+            print 'cnt_tile = ', cnt_tile
+            cnt_tile.mask = ~(cnt_tile.data.astype(bool))
+            avg_tile.mask = cnt_tile.mask
+            avg_tile /= cnt_tile
+            print 'Finished tile %s' % tile_in.section_spec
+            print 'Tile avg = ', avg_tile
+            sys.stdout.flush()
+        else:
+            avg_tile = None
+            min_lat = None
+            max_lat = None
+            min_lon = None
+            max_lon = None
+            print 'Tile %s outside of bounding box' % tile_in.section_spec
+            sys.stdout.flush()
+        return (avg_tile,min_lat,max_lat,min_lon,max_lon)
+
+    def _lat2ind(self,lat):
+        return int((lat-self._minLatCent)/self._latRes)
+
+    def _lon2ind(self,lon):
+        return int((lon-self._minLonCent)/self._lonRes)
+
+    def _create_nc_file(self, a):
+        print 'a=',a
+        print 'shape a = ', a.shape
+        sys.stdout.flush()
+        lat_dim, lon_dim = a.shape
+        rootgrp = Dataset("tam.nc", "w", format="NETCDF4")
+        rootgrp.createDimension("lat", lat_dim)
+        rootgrp.createDimension("lon", lon_dim)
+        rootgrp.createVariable("TRMM_3B42_daily_precipitation_V7", "f4",
+                               dimensions=("lat","lon",))
+        rootgrp.createVariable("lat", "f4", dimensions=("lat",))
+        rootgrp.createVariable("lon", "f4", dimensions=("lon",))
+        rootgrp.variables["TRMM_3B42_daily_precipitation_V7"][:,:] = a
+        rootgrp.variables["lat"][:] = np.linspace(self._minLatCent, 
+                                                  self._maxLatCent, lat_dim)
+        rootgrp.variables["lon"][:] = np.linspace(self._minLonCent,
+                                                  self._maxLonCent, lon_dim)
+        rootgrp.close()
+
+    def calc(self, computeOptions, **args):
+        """
+
+        :param computeOptions: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        self._minLat = float(computeOptions.get_min_lat())
+        self._maxLat = float(computeOptions.get_max_lat())
+        self._minLon = float(computeOptions.get_min_lon())
+        self._maxLon = float(computeOptions.get_max_lon())
+        self._ds = computeOptions.get_dataset()[0]
+        self._startTime = computeOptions.get_start_time()
+        self._endTime = computeOptions.get_end_time()
+
+        self._find_native_resolution()
+        print 'Using Native resolution: lat_res=%f, lon_res=%f' % (self._latRes, self._lonRes)
+        self._minLatCent = self._minLat + self._latRes / 2
+        self._minLonCent = self._minLon + self._lonRes / 2
+        nlats = int((self._maxLat-self._minLatCent)/self._latRes)+1
+        nlons = int((self._maxLon-self._minLonCent)/self._lonRes)+1
+        self._maxLatCent = self._minLatCent + (nlats-1) * self._latRes
+        self._maxLonCent = self._minLonCent + (nlons-1) * self._lonRes
+        print 'nlats=',nlats,'nlons=',nlons
+        print 'center lat range = %f to %f' % (self._minLatCent, 
+                                               self._maxLatCent)
+        print 'center lon range = %f to %f' % (self._minLonCent, 
+                                               self._maxLonCent)
+        sys.stdout.flush()
+        a = np.zeros((nlats, nlons),dtype=np.float64,order='C')
+
+        nexus_tiles = self._find_global_tile_set()
+        # print 'tiles:'
+        # for tile in nexus_tiles:
+        #     print tile.granule
+        #     print tile.section_spec
+        #     print 'lat:', tile.latitudes
+        #     print 'lon:', tile.longitudes
+
+        #                                                          nexus_tiles)
+        if len(nexus_tiles) == 0:
+            raise NoDataException(reason="No data found for selected timeframe")
+
+        print 'Initially found %d tiles' % len(nexus_tiles)
+        sys.stdout.flush()
+        self._prune_tiles(nexus_tiles)
+        print 'Pruned to %d tiles' % len(nexus_tiles)
+        sys.stdout.flush()
+        #for tile in nexus_tiles:
+        #    print 'lats: ', tile.latitudes.compressed()
+        #    print 'lons: ', tile.longitudes.compressed()
+
+        avg_tiles = map(self._map, nexus_tiles)
+        print 'shape a = ', a.shape
+        sys.stdout.flush()
+        # The tiles below are NOT Nexus objects.  They are tuples
+        # with the time avg map data and lat-lon bounding box.
+        for tile in avg_tiles:
+            if tile is not None:
+                (tile_data, tile_min_lat, tile_max_lat, 
+                 tile_min_lon, tile_max_lon) = tile
+                print 'shape tile_data = ', tile_data.shape
+                print 'tile data mask = ', tile_data.mask
+                sys.stdout.flush()
+                if np.any(np.logical_not(tile_data.mask)):
+                    y0 = self._lat2ind(tile_min_lat)
+                    y1 = self._lat2ind(tile_max_lat)
+                    x0 = self._lon2ind(tile_min_lon)
+                    x1 = self._lon2ind(tile_max_lon)
+                    print 'writing tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \
+                        (tile_min_lat, tile_max_lat, 
+                         tile_min_lon, tile_max_lon, y0, y1, x0, x1)
+                    sys.stdout.flush()
+                    a[y0:y1+1,x0:x1+1] = tile_data
+                else:
+                    print 'All pixels masked in tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \
+                        (tile_min_lat, tile_max_lat, 
+                         tile_min_lon, tile_max_lon, y0, y1, x0, x1)
+                    sys.stdout.flush()
+
+        self._create_nc_file(a)
+
+        return TimeAvgMapResults(results={}, meta={}, computeOptions=computeOptions)
+
+
+class TimeAvgMapResults(NexusResults):
+
+    def __init__(self, results=None, meta=None, computeOptions=None):
+        NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeSeries.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py
new file mode 100644
index 0000000..858fea4
--- /dev/null
+++ b/analysis/webservice/algorithms/TimeSeries.py
@@ -0,0 +1,549 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import calendar
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.dates as mdates
+import matplotlib.pyplot as plt
+import numpy as np
+import pytz
+import shapely.geometry
+import shapely.wkt
+from backports.functools_lru_cache import lru_cache
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
+from scipy import stats
+
+from webservice import Filtering as filtering
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+SENTINEL = 'STOP'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class TimeSeriesHandlerImpl(NexusHandler):
+    name = "Time Series"
+    path = "/stats"
+    description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "comma-delimited string",
+            "description": "The dataset(s) Used to generate the Time Series. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "seasonalFilter": {
+            "name": "Compute Seasonal Cycle Filter",
+            "type": "boolean",
+            "description": "Flag used to specify if the seasonal averages should be computed during "
+                           "Time Series computation. Optional (Default: True)"
+        },
+        "lowPassFilter": {
+            "name": "Compute Low Pass Filter",
+            "type": "boolean",
+            "description": "Flag used to specify if a low pass filter should be computed during "
+                           "Time Series computation. Optional (Default: True)"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        try:
+            ds = request.get_dataset()
+            if type(ds) != list and type(ds) != tuple:
+                ds = (ds,)
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be comma-delimited string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(reason="Cannot compute time series on a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
+                                           request.get_max_lon(), request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        apply_seasonal_cycle_filter = request.get_apply_seasonal_cycle_filter()
+        apply_low_pass_filter = request.get_apply_low_pass_filter()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+               apply_seasonal_cycle_filter, apply_low_pass_filter
+
+    def calc(self, request, **args):
+        """
+
+        :param request: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+        apply_seasonal_cycle_filter, apply_low_pass_filter = self.parse_arguments(request)
+
+        resultsRaw = []
+
+        for shortName in ds:
+            results, meta = self.getTimeSeriesStatsForBoxSingleDataSet(bounding_polygon,
+                                                                       shortName,
+                                                                       start_seconds_from_epoch,
+                                                                       end_seconds_from_epoch,
+                                                                       apply_seasonal_cycle_filter=apply_seasonal_cycle_filter,
+                                                                       apply_low_pass_filter=apply_low_pass_filter)
+            resultsRaw.append([results, meta])
+
+        the_time = datetime.now()
+        results = self._mergeResults(resultsRaw)
+
+        if len(ds) == 2:
+            try:
+                stats = TimeSeriesHandlerImpl.calculate_comparison_stats(results)
+            except Exception:
+                stats = {}
+                tb = traceback.format_exc()
+                self.log.warn("Error when calculating comparison stats:\n%s" % tb)
+        else:
+            stats = {}
+
+        meta = []
+        for singleRes in resultsRaw:
+            meta.append(singleRes[1])
+
+        res = TimeSeriesResults(results=results, meta=meta, stats=stats,
+                                computeOptions=None, minLat=bounding_polygon.bounds[1],
+                                maxLat=bounding_polygon.bounds[3], minLon=bounding_polygon.bounds[0],
+                                maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
+                                endTime=end_seconds_from_epoch)
+
+        self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
+        return res
+
+    def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch,
+                                              end_seconds_from_epoch,
+                                              apply_seasonal_cycle_filter=True, apply_low_pass_filter=True):
+
+        the_time = datetime.now()
+        daysinrange = self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1],
+                                                                bounding_polygon.bounds[3],
+                                                                bounding_polygon.bounds[0],
+                                                                bounding_polygon.bounds[2],
+                                                                ds,
+                                                                start_seconds_from_epoch,
+                                                                end_seconds_from_epoch)
+        self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+        if len(daysinrange) == 0:
+            raise NoDataException(reason="No data found for selected timeframe")
+
+        the_time = datetime.now()
+        maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+        results = []
+        if maxprocesses == 1:
+            calculator = TimeSeriesCalculator()
+            for dayinseconds in daysinrange:
+                result = calculator.calc_average_on_day(bounding_polygon.wkt, ds, dayinseconds)
+                results += [result] if result else []
+        else:
+            # Create a task to calc average difference for each day
+            manager = Manager()
+            work_queue = manager.Queue()
+            done_queue = manager.Queue()
+            for dayinseconds in daysinrange:
+                work_queue.put(
+                    ('calc_average_on_day', bounding_polygon.wkt, ds, dayinseconds))
+            [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+            # Start new processes to handle the work
+            pool = Pool(maxprocesses)
+            [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+            pool.close()
+
+            # Collect the results as [(day (in ms), average difference for that day)]
+            for i in xrange(0, len(daysinrange)):
+                result = done_queue.get()
+                try:
+                    error_str = result['error']
+                    self.log.error(error_str)
+                    raise NexusProcessingException(reason="Error calculating average by day.")
+                except KeyError:
+                    pass
+
+                results += [result] if result else []
+
+            pool.terminate()
+            manager.shutdown()
+
+        results = sorted(results, key=lambda entry: entry["time"])
+        self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+        if apply_seasonal_cycle_filter:
+            the_time = datetime.now()
+            for result in results:
+                month = datetime.utcfromtimestamp(result['time']).month
+                month_mean, month_max, month_min = self.calculate_monthly_average(month, bounding_polygon.wkt, ds)
+                seasonal_mean = result['mean'] - month_mean
+                seasonal_min = result['min'] - month_min
+                seasonal_max = result['max'] - month_max
+                result['meanSeasonal'] = seasonal_mean
+                result['minSeasonal'] = seasonal_min
+                result['maxSeasonal'] = seasonal_max
+            self.log.info(
+                "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+        the_time = datetime.now()
+        filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter)
+        filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter)
+        filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter)
+
+        if apply_seasonal_cycle_filter and apply_low_pass_filter:
+            try:
+                filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True,
+                                         append="LowPass")
+                filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True,
+                                         append="LowPass")
+                filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True,
+                                         append="LowPass")
+            except Exception as e:
+                # If it doesn't work log the error but ignore it
+                tb = traceback.format_exc()
+                self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
+
+        self.log.info(
+            "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+
+        return results, {}
+
+    @lru_cache()
+    def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None):
+
+        min_date, max_date = self.get_min_max_date(ds=ds)
+
+        monthly_averages, monthly_counts = [], []
+        monthly_mins, monthly_maxes = [], []
+        bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
+        for year in range(min_date.year, max_date.year + 1):
+            beginning_of_month = datetime(year, month, 1)
+            end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59)
+            start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds()
+            end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
+            tile_stats = self._tile_service.find_tiles_in_polygon(bounding_polygon, ds, start, end,
+                                                                  fl=('id,'
+                                                                      'tile_avg_val_d,tile_count_i,'
+                                                                      'tile_min_val_d,tile_max_val_d,'
+                                                                      'tile_min_lat,tile_max_lat,'
+                                                                      'tile_min_lon,tile_max_lon'),
+                                                                  fetch_data=False)
+            if len(tile_stats) == 0:
+                continue
+
+            # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box.
+            border_tiles, inner_tiles = [], []
+            for tile in tile_stats:
+                inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon,
+                                                                                           tile.bbox.min_lat,
+                                                                                           tile.bbox.max_lon,
+                                                                                           tile.bbox.max_lat)) else border_tiles.append(
+                    tile)
+
+            # We can use the stats of the inner tiles directly
+            tile_means = [tile.tile_stats.mean for tile in inner_tiles]
+            tile_mins = [tile.tile_stats.min for tile in inner_tiles]
+            tile_maxes = [tile.tile_stats.max for tile in inner_tiles]
+            tile_counts = [tile.tile_stats.count for tile in inner_tiles]
+
+            # Border tiles need have the data loaded, masked, and stats recalculated
+            border_tiles = list(self._tile_service.fetch_data_for_tiles(*border_tiles))
+            border_tiles = self._tile_service.mask_tiles_to_polygon(bounding_polygon,  border_tiles)
+            for tile in border_tiles:
+                tile.update_stats()
+                tile_means.append(tile.tile_stats.mean)
+                tile_mins.append(tile.tile_stats.min)
+                tile_maxes.append(tile.tile_stats.max)
+                tile_counts.append(tile.tile_stats.count)
+
+            tile_means = np.array(tile_means)
+            tile_mins = np.array(tile_mins)
+            tile_maxes = np.array(tile_maxes)
+            tile_counts = np.array(tile_counts)
+
+            sum_tile_counts = np.sum(tile_counts) * 1.0
+
+            monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()]
+            monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()]
+            monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()]
+            monthly_counts += [sum_tile_counts]
+
+        count_sum = np.sum(monthly_counts) * 1.0
+        weights = np.array(monthly_counts) / count_sum
+
+        return np.average(monthly_averages, None, weights).item(), \
+               np.average(monthly_averages, None, weights).item(), \
+               np.average(monthly_averages, None, weights).item()
+
+    @lru_cache()
+    def get_min_max_date(self, ds=None):
+        min_date = pytz.timezone('UTC').localize(
+            datetime.utcfromtimestamp(self._tile_service.get_min_time([], ds=ds)))
+        max_date = pytz.timezone('UTC').localize(
+            datetime.utcfromtimestamp(self._tile_service.get_max_time([], ds=ds)))
+
+        return min_date.date(), max_date.date()
+
+    @staticmethod
+    def calculate_comparison_stats(results):
+        xy = [[], []]
+
+        for item in results:
+            if len(item) == 2:
+                xy[item[0]["ds"]].append(item[0]["mean"])
+                xy[item[1]["ds"]].append(item[1]["mean"])
+
+        slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1])
+        comparisonStats = {
+            "slope": slope,
+            "intercept": intercept,
+            "r": r_value,
+            "p": p_value,
+            "err": std_err
+        }
+
+        return comparisonStats
+
+
+class TimeSeriesResults(NexusResults):
+    LINE_PLOT = "line"
+    SCATTER_PLOT = "scatter"
+
+    __SERIES_COLORS = ['red', 'blue']
+
+    def toImage(self):
+
+        type = self.computeOptions().get_plot_type()
+
+        if type == TimeSeriesResults.LINE_PLOT or type == "default":
+            return self.createLinePlot()
+        elif type == TimeSeriesResults.SCATTER_PLOT:
+            return self.createScatterPlot()
+        else:
+            raise Exception("Invalid or unsupported time series plot specified")
+
+    def createScatterPlot(self):
+        timeSeries = []
+        series0 = []
+        series1 = []
+
+        res = self.results()
+        meta = self.meta()
+
+        plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "mean"
+
+        for m in res:
+            if len(m) == 2:
+                timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000))
+                series0.append(m[0][plotSeries])
+                series1.append(m[1][plotSeries])
+
+        title = ', '.join(set([m['title'] for m in meta]))
+        sources = ', '.join(set([m['source'] for m in meta]))
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        fig, ax = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        ax.scatter(series0, series1, alpha=0.5)
+        ax.set_xlabel(meta[0]['units'])
+        ax.set_ylabel(meta[1]['units'])
+        ax.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+
+        par = np.polyfit(series0, series1, 1, full=True)
+        slope = par[0][0]
+        intercept = par[0][1]
+        xl = [min(series0), max(series0)]
+        yl = [slope * xx + intercept for xx in xl]
+        plt.plot(xl, yl, '-r')
+
+        ax.grid(True)
+        fig.tight_layout()
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+    def createLinePlot(self):
+        nseries = len(self.meta())
+        res = self.results()
+        meta = self.meta()
+
+        timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res]
+
+        means = [[np.nan] * len(res) for n in range(0, nseries)]
+
+        plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "mean"
+
+        for n in range(0, len(res)):
+            timeSlot = res[n]
+            for seriesValues in timeSlot:
+                means[seriesValues['ds']][n] = seriesValues[plotSeries]
+
+        x = timeSeries
+
+        fig, axMain = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        fig.autofmt_xdate()
+
+        title = ', '.join(set([m['title'] for m in meta]))
+        sources = ', '.join(set([m['source'] for m in meta]))
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+        axMain.set_xlabel('Date')
+        axMain.grid(True)
+        axMain.xaxis.set_major_locator(mdates.YearLocator())
+        axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
+        axMain.xaxis.set_minor_locator(mdates.MonthLocator())
+        axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d')
+
+        plots = []
+
+        for n in range(0, nseries):
+            if n == 0:
+                ax = axMain
+            else:
+                ax = ax.twinx()
+
+            plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title'])
+            ax.set_ylabel(meta[n]['units'])
+
+        labs = [l.get_label() for l in plots]
+        axMain.legend(plots, labs, loc=0)
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+
+class TimeSeriesCalculator(object):
+    def __init__(self):
+        self.__tile_service = NexusTileService()
+
+    def calc_average_on_day(self, bounding_polygon_wkt, dataset, timeinseconds):
+        bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
+        ds1_nexus_tiles = self.__tile_service.get_tiles_bounded_by_polygon_at_time(bounding_polygon,
+                                                                                   dataset,
+                                                                                   timeinseconds)
+
+        # If all data ends up getting masked, ds1_nexus_tiles will be empty
+        if len(ds1_nexus_tiles) == 0:
+            return {}
+
+        tile_data_agg = np.ma.array([tile.data for tile in ds1_nexus_tiles])
+        data_min = np.ma.min(tile_data_agg)
+        data_max = np.ma.max(tile_data_agg)
+        daily_mean = np.ma.mean(tile_data_agg).item()
+        data_count = np.ma.count(tile_data_agg)
+        try:
+            data_count = data_count.item()
+        except AttributeError:
+            pass
+        data_std = np.ma.std(tile_data_agg)
+
+        # Return Stats by day
+        stat = {
+            'min': data_min,
+            'max': data_max,
+            'mean': daily_mean,
+            'cnt': data_count,
+            'std': data_std,
+            'time': int(timeinseconds)
+        }
+        return stat
+
+
+def pool_worker(work_queue, done_queue):
+    try:
+        calculator = TimeSeriesCalculator()
+
+        for work in iter(work_queue.get, SENTINEL):
+            scifunction = work[0]
+            args = work[1:]
+            result = calculator.__getattribute__(scifunction)(*args)
+            done_queue.put(result)
+
+    except Exception as e:
+        e_str = traceback.format_exc(e)
+        done_queue.put({'error': e_str})

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeSeriesSolr.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py
new file mode 100644
index 0000000..1da55b2
--- /dev/null
+++ b/analysis/webservice/algorithms/TimeSeriesSolr.py
@@ -0,0 +1,342 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import sys
+import traceback
+import logging
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.dates as mdates
+import matplotlib.pyplot as plt
+import numpy as np
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from nexustiles.nexustiles import NexusTileService
+from scipy import stats
+
+from webservice import Filtering as filt
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+SENTINEL = 'STOP'
+
+
+@nexus_handler
+class TimeSeriesHandlerImpl(NexusHandler):
+    name = "Time Series Solr"
+    path = "/statsSolr"
+    description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=True)
+        self.log = logging.getLogger(__name__)
+
+    def calc(self, computeOptions, **args):
+        """
+
+        :param computeOptions: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        ds = computeOptions.get_dataset()
+
+        if type(ds) != list and type(ds) != tuple:
+            ds = (ds,)
+
+        resultsRaw = []
+
+        for shortName in ds:
+            results, meta = self.getTimeSeriesStatsForBoxSingleDataSet(computeOptions.get_min_lat(),
+                                                                       computeOptions.get_max_lat(),
+                                                                       computeOptions.get_min_lon(),
+                                                                       computeOptions.get_max_lon(),
+                                                                       shortName,
+                                                                       computeOptions.get_start_time(),
+                                                                       computeOptions.get_end_time(),
+                                                                       computeOptions.get_apply_seasonal_cycle_filter(),
+                                                                       computeOptions.get_apply_low_pass_filter())
+            resultsRaw.append([results, meta])
+
+        results = self._mergeResults(resultsRaw)
+
+        if len(ds) == 2:
+            stats = self.calculateComparisonStats(results, suffix="")
+            if computeOptions.get_apply_seasonal_cycle_filter():
+                s = self.calculateComparisonStats(results, suffix="Seasonal")
+                stats = self._mergeDicts(stats, s)
+            if computeOptions.get_apply_low_pass_filter():
+                s = self.calculateComparisonStats(results, suffix="LowPass")
+                stats = self._mergeDicts(stats, s)
+            if computeOptions.get_apply_seasonal_cycle_filter() and computeOptions.get_apply_low_pass_filter():
+                s = self.calculateComparisonStats(results, suffix="SeasonalLowPass")
+                stats = self._mergeDicts(stats, s)
+        else:
+            stats = {}
+
+        meta = []
+        for singleRes in resultsRaw:
+            meta.append(singleRes[1])
+
+        res = TimeSeriesResults(results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+        return res
+
+    def getTimeSeriesStatsForBoxSingleDataSet(self, min_lat, max_lat, min_lon, max_lon, ds, start_time=0, end_time=-1,
+                                              applySeasonalFilter=True, applyLowPass=True):
+
+        daysinrange = self._tile_service.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
+                                                                end_time)
+
+        if len(daysinrange) == 0:
+            raise NoDataException(reason="No data found for selected timeframe")
+
+        maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+        results = []
+        if maxprocesses == 1:
+            calculator = TimeSeriesCalculator()
+            for dayinseconds in daysinrange:
+                result = calculator.calc_average_on_day(min_lat, max_lat, min_lon, max_lon, ds, dayinseconds)
+                results.append(result)
+        else:
+            # Create a task to calc average difference for each day
+            manager = Manager()
+            work_queue = manager.Queue()
+            done_queue = manager.Queue()
+            for dayinseconds in daysinrange:
+                work_queue.put(
+                    ('calc_average_on_day', min_lat, max_lat, min_lon, max_lon, ds, dayinseconds))
+            [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+            # Start new processes to handle the work
+            pool = Pool(maxprocesses)
+            [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+            pool.close()
+
+            # Collect the results as [(day (in ms), average difference for that day)]
+            for i in xrange(0, len(daysinrange)):
+                result = done_queue.get()
+                try:
+                    error_str = result['error']
+                    self.log.error(error_str)
+                    raise NexusProcessingException(reason="Error calculating average by day.")
+                except KeyError:
+                    pass
+
+                results.append(result)
+
+            pool.terminate()
+            manager.shutdown()
+
+        results = sorted(results, key=lambda entry: entry["time"])
+
+        filt.applyAllFiltersOnField(results, 'mean', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
+        filt.applyAllFiltersOnField(results, 'max', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
+        filt.applyAllFiltersOnField(results, 'min', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass)
+
+        return results, {}
+
+    def calculateComparisonStats(self, results, suffix=""):
+
+        xy = [[], []]
+
+        for item in results:
+            if len(item) == 2:
+                xy[item[0]["ds"]].append(item[0]["mean%s" % suffix])
+                xy[item[1]["ds"]].append(item[1]["mean%s" % suffix])
+
+        slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1])
+        comparisonStats = {
+            "slope%s" % suffix: slope,
+            "intercept%s" % suffix: intercept,
+            "r%s" % suffix: r_value,
+            "p%s" % suffix: p_value,
+            "err%s" % suffix: std_err
+        }
+
+        return comparisonStats
+
+
+class TimeSeriesResults(NexusResults):
+    LINE_PLOT = "line"
+    SCATTER_PLOT = "scatter"
+
+    __SERIES_COLORS = ['red', 'blue']
+
+    def __init__(self, results=None, meta=None, stats=None, computeOptions=None):
+        NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+
+    def toImage(self):
+
+        type = self.computeOptions().get_plot_type()
+
+        if type == TimeSeriesResults.LINE_PLOT or type == "default":
+            return self.createLinePlot()
+        elif type == TimeSeriesResults.SCATTER_PLOT:
+            return self.createScatterPlot()
+        else:
+            raise Exception("Invalid or unsupported time series plot specified")
+
+    def createScatterPlot(self):
+        timeSeries = []
+        series0 = []
+        series1 = []
+
+        res = self.results()
+        meta = self.meta()
+
+        plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "mean"
+
+        for m in res:
+            if len(m) == 2:
+                timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000))
+                series0.append(m[0][plotSeries])
+                series1.append(m[1][plotSeries])
+
+        title = ', '.join(set([m['title'] for m in meta]))
+        sources = ', '.join(set([m['source'] for m in meta]))
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        fig, ax = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        ax.scatter(series0, series1, alpha=0.5)
+        ax.set_xlabel(meta[0]['units'])
+        ax.set_ylabel(meta[1]['units'])
+        ax.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+
+        par = np.polyfit(series0, series1, 1, full=True)
+        slope = par[0][0]
+        intercept = par[0][1]
+        xl = [min(series0), max(series0)]
+        yl = [slope * xx + intercept for xx in xl]
+        plt.plot(xl, yl, '-r')
+
+        # r = self.stats()["r"]
+        # plt.text(0.5, 0.5, "r = foo")
+
+        ax.grid(True)
+        fig.tight_layout()
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+    def createLinePlot(self):
+        nseries = len(self.meta())
+        res = self.results()
+        meta = self.meta()
+
+        timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res]
+
+        means = [[np.nan] * len(res) for n in range(0, nseries)]
+
+        plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "mean"
+
+        for n in range(0, len(res)):
+            timeSlot = res[n]
+            for seriesValues in timeSlot:
+                means[seriesValues['ds']][n] = seriesValues[plotSeries]
+
+        x = timeSeries
+
+        fig, axMain = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        fig.autofmt_xdate()
+
+        title = ', '.join(set([m['title'] for m in meta]))
+        sources = ', '.join(set([m['source'] for m in meta]))
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+        axMain.set_xlabel('Date')
+        axMain.grid(True)
+        axMain.xaxis.set_major_locator(mdates.YearLocator())
+        axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
+        axMain.xaxis.set_minor_locator(mdates.MonthLocator())
+        axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d')
+
+        plots = []
+
+        for n in range(0, nseries):
+            if n == 0:
+                ax = axMain
+            else:
+                ax = ax.twinx()
+
+            plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title'])
+            ax.set_ylabel(meta[n]['units'])
+
+        labs = [l.get_label() for l in plots]
+        axMain.legend(plots, labs, loc=0)
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+
+class TimeSeriesCalculator(object):
+    def __init__(self):
+        self.__tile_service = NexusTileService()
+
+    def calc_average_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset, timeinseconds):
+        # Get stats using solr only
+        ds1_nexus_tiles_stats = self.__tile_service.get_stats_within_box_at_time(min_lat, max_lat, min_lon, max_lon,
+                                                                               dataset,
+                                                                               timeinseconds)
+
+        data_min_within = min([tile["tile_min_val_d"] for tile in ds1_nexus_tiles_stats])
+        data_max_within = max([tile["tile_max_val_d"] for tile in ds1_nexus_tiles_stats])
+        data_sum_within = sum([tile["product(tile_avg_val_d, tile_count_i)"] for tile in ds1_nexus_tiles_stats])
+        data_count_within = sum([tile["tile_count_i"] for tile in ds1_nexus_tiles_stats])
+
+        # Get boundary tiles and calculate stats
+        ds1_nexus_tiles = self.__tile_service.get_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon,
+                                                                               dataset,
+                                                                               timeinseconds)
+
+        tile_data_agg = np.ma.array([tile.data for tile in ds1_nexus_tiles])
+        data_min_boundary = np.ma.min(tile_data_agg)
+        data_max_boundary = np.ma.max(tile_data_agg)
+        #daily_mean = np.ma.mean(tile_data_agg).item()
+        data_sum_boundary = np.ma.sum(tile_data_agg)
+        data_count_boundary = np.ma.count(tile_data_agg).item()
+        #data_std = np.ma.std(tile_data_agg)
+
+        # Combine stats
+        data_min = min(data_min_within, data_min_boundary)
+        data_max = max(data_max_within, data_max_boundary)
+        data_count = data_count_within + data_count_boundary
+        daily_mean = (data_sum_within + data_sum_boundary) / data_count
+        data_std = 0
+
+        # Return Stats by day
+        stat = {
+            'min': data_min,
+            'max': data_max,
+            'mean': daily_mean,
+            'cnt': data_count,
+            'std': data_std,
+            'time': int(timeinseconds)
+        }
+        return stat
+
+
+def pool_worker(work_queue, done_queue):
+    try:
+        calculator = TimeSeriesCalculator()
+
+        for work in iter(work_queue.get, SENTINEL):
+            scifunction = work[0]
+            args = work[1:]
+            result = calculator.__getattribute__(scifunction)(*args)
+            done_queue.put(result)
+    except Exception as e:
+        e_str = traceback.format_exc(e)
+        done_queue.put({'error': e_str})

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/__init__.py b/analysis/webservice/algorithms/__init__.py
new file mode 100644
index 0000000..fb5ecc8
--- /dev/null
+++ b/analysis/webservice/algorithms/__init__.py
@@ -0,0 +1,20 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import Capabilities
+import CorrelationMap
+import DailyDifferenceAverage
+import DataInBoundsSearch
+import DataSeriesList
+import DelayTest
+import ErrorTosserTest
+import Heartbeat
+import HofMoeller
+import LongitudeLatitudeMap
+import TestInitializer
+import TileSearch
+import TimeAvgMap
+import TimeSeries
+import TimeSeriesSolr
+import StandardDeviationSearch
\ No newline at end of file