You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:27 UTC
[50/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all
code under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/tests/algorithms_spark/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py
new file mode 100644
index 0000000..bd9282c
--- /dev/null
+++ b/analysis/tests/algorithms_spark/__init__.py
@@ -0,0 +1,4 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/Filtering.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/Filtering.py b/analysis/webservice/Filtering.py
new file mode 100644
index 0000000..08b976e
--- /dev/null
+++ b/analysis/webservice/Filtering.py
@@ -0,0 +1,157 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+
+import math
+
+import logging
+import traceback
+
+import numpy as np
+from scipy import stats
+from scipy.fftpack import fft
+from scipy.ndimage.interpolation import zoom
+from scipy.interpolate import UnivariateSpline
+from scipy.signal import wiener, filtfilt, butter, gaussian, freqz
+from scipy.ndimage import filters
+
+log = logging.getLogger('Filtering')
+
+
+def __fieldToList(results, field):
+ a = np.zeros(len(results))
+ for n in range(0, len(results)):
+ a[n] = results[n][field]
+ return a
+
+
+def __listToField(results, l, field):
+ if results is None or l is None:
+ raise Exception("Cannot transpose values if they're null")
+
+ if not len(results) == len(l):
+ raise Exception("Cannot transpose values between lists of inequal length")
+
+ for n in range(0, len(results)):
+ results[n][field] = l[n]
+
+
+def applySeasonalCycleFilter1d(l):
+ if len(l) <= 12:
+ return l
+
+ for a in range(0, 12):
+ values = []
+ for b in range(a, len(l), 12):
+ values.append(l[b])
+ avg = np.average(values)
+ for b in range(a, len(l), 12):
+ l[b] -= avg
+ return l
+
+
+def applySeasonalCycleFilter2d(l):
+ return l
+
+
+'''
+ Implements monthly filtering of seasonal cycles.
+'''
+
+
+def applySeasonalCycleFilter(l):
+ if len(np.shape(l)) == 1:
+ return applySeasonalCycleFilter1d(l)
+ elif len(np.shape(l)) == 2:
+ return applySeasonalCycleFilter2d(l)
+ else:
+ raise Exception("Cannot apply seasonal cycle filter: Unsupported array shape")
+
+
+def applySeasonalCycleFilterOnResultsField(results, field):
+ l = __fieldToList(results, field)
+ applySeasonalCycleFilter(l)
+ __listToField(results, l, field)
+
+
+def applySeasonalCycleFilterOnResults(results):
+ [applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']]
+
+
+'''
+http://www.nehalemlabs.net/prototype/blog/2013/04/05/an-introduction-to-smoothing-time-series-in-python-part-i-filtering-theory/
+'''
+
+
+def applyLowPassFilter(y, lowcut=12.0, order=9.0):
+ if len(y) - 12 <= lowcut:
+ lowcut = 3
+ nyq = 0.5 * len(y)
+ low = lowcut / nyq
+ # high = highcut / nyq
+ b, a = butter(order, low)
+ m = min([len(y), len(a), len(b)])
+ padlen = 30 if m >= 30 else m
+ fl = filtfilt(b, a, y, padlen=padlen)
+ return fl
+
+
+def applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=False, append=""):
+ x = __fieldToList(results, field)
+
+ if applySeasonal:
+ x = applySeasonalCycleFilter(x)
+ if applyLowPass:
+ x = applyLowPassFilter(x)
+ __listToField(results, x, "%s%s" % (field, append))
+
+
+def applyAllFiltersOnField(results, field, applySeasonal=True, applyLowPass=True):
+ try:
+ if applySeasonal:
+ applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=False, append="Seasonal")
+ except Exception as e:
+ # If it doesn't work log the error but ignore it
+ tb = traceback.format_exc()
+ log.warn("Error calculating Seasonal filter:\n%s" % tb)
+
+ try:
+ if applyLowPass:
+ applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=True, append="LowPass")
+ except Exception as e:
+ # If it doesn't work log the error but ignore it
+ tb = traceback.format_exc()
+ log.warn("Error calculating LowPass filter:\n%s" % tb)
+
+ try:
+ if applySeasonal and applyLowPass:
+ applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=True, append="SeasonalLowPass")
+ except Exception as e:
+ # If it doesn't work log the error but ignore it
+ tb = traceback.format_exc()
+ log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
+
+
+'''
+class ResultsFilter(object):
+
+ def __init__(self):
+ pass
+
+ def filter(self, results, append, **kwargs):
+ pass
+
+
+
+class SeasonalCycleFilter(ResultsFilter):
+
+ def filter(self, results, append, **kwargs):
+ [applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']]
+
+if __name__ == "__main__":
+
+ foo = "bar"
+ f = ResultsFilter()
+ f.test("Tester", blah=foo)
+'''
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/LayerConfig.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/LayerConfig.py b/analysis/webservice/LayerConfig.py
new file mode 100644
index 0000000..e271b72
--- /dev/null
+++ b/analysis/webservice/LayerConfig.py
@@ -0,0 +1,61 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+
+
+ALL_LAYERS_ENABLED = True
+LAYERS = []
+LAYERS.append({"shortName":"NCDC-L4LRblend-GLOB-AVHRR_OI", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SSH_alti_1deg_1mon", "envs": ("ALL",)})
+
+
+LAYERS.append({"shortName":"SIacSubl_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"PHIBOT_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIhsnow_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIheff_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceFWflx_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceQnet_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"MXLDEPTH_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIatmQnt_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceSPflx_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceSPDep_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIarea_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"ETAN_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"sIceLoad_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceQsw_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIsnPrcp_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"DETADT2_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"TFLUX_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SItflux_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SFLUX_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"TELLUS_GRACE_MASCON_GRID_RL05_V1_LAND", "envs": ("ALL",)})
+LAYERS.append({"shortName":"TELLUS_GRACE_MASCON_GRID_RL05_V1_OCEAN", "envs": ("ALL",)})
+LAYERS.append({"shortName":"Sea_Surface_Anomalies", "envs": ("DEV",)})
+
+LAYERS.append({"shortName":"JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1", "envs": ("ALL",)})
+
+
+def isLayerEnabled(shortName, env):
+ if ALL_LAYERS_ENABLED:
+ return True
+ if env == None:
+ env = "PROD"
+ env = env.upper()
+ if env == "DEV":
+ return True
+ for layer in LAYERS:
+ if layer["shortName"] == shortName and ("ALL" in layer["envs"] or env.upper() in layer["envs"]):
+ return True
+ return False
+
+
+if __name__ == "__main__":
+
+ print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", None)
+ print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", "PROD")
+ print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", "SIT")
+
+ print isLayerEnabled("TFLUX_ECCO_version4_release1", None)
+ print isLayerEnabled("TFLUX_ECCO_version4_release1", "PROD")
+ print isLayerEnabled("TFLUX_ECCO_version4_release1", "SIT")
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/NexusHandler.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/NexusHandler.py b/analysis/webservice/NexusHandler.py
new file mode 100644
index 0000000..1cad3cf
--- /dev/null
+++ b/analysis/webservice/NexusHandler.py
@@ -0,0 +1,554 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import sys
+import numpy as np
+import logging
+import time
+import types
+from datetime import datetime
+from netCDF4 import Dataset
+from nexustiles.nexustiles import NexusTileService
+from webservice.webmodel import NexusProcessingException
+
+AVAILABLE_HANDLERS = []
+AVAILABLE_INITIALIZERS = []
+
+
+def nexus_initializer(clazz):
+ log = logging.getLogger(__name__)
+ try:
+ wrapper = NexusInitializerWrapper(clazz)
+ log.info("Adding initializer '%s'" % wrapper.clazz())
+ AVAILABLE_INITIALIZERS.append(wrapper)
+ except Exception as ex:
+ log.warn("Initializer '%s' failed to load (reason: %s)" % (clazz, ex.message), exc_info=True)
+ return clazz
+
+
+def nexus_handler(clazz):
+ log = logging.getLogger(__name__)
+ try:
+ wrapper = AlgorithmModuleWrapper(clazz)
+ log.info("Adding algorithm module '%s' with path '%s' (%s)" % (wrapper.name(), wrapper.path(), wrapper.clazz()))
+ AVAILABLE_HANDLERS.append(wrapper)
+ except Exception as ex:
+ log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True)
+ return clazz
+
+
+DEFAULT_PARAMETERS_SPEC = {
+ "ds": {
+ "name": "Dataset",
+ "type": "string",
+ "description": "One or more comma-separated dataset shortnames"
+ },
+ "minLat": {
+ "name": "Minimum Latitude",
+ "type": "float",
+ "description": "Minimum (Southern) bounding box Latitude"
+ },
+ "maxLat": {
+ "name": "Maximum Latitude",
+ "type": "float",
+ "description": "Maximum (Northern) bounding box Latitude"
+ },
+ "minLon": {
+ "name": "Minimum Longitude",
+ "type": "float",
+ "description": "Minimum (Western) bounding box Longitude"
+ },
+ "maxLon": {
+ "name": "Maximum Longitude",
+ "type": "float",
+ "description": "Maximum (Eastern) bounding box Longitude"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "long integer",
+ "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "long integer",
+ "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC"
+ },
+ "lowPassFilter": {
+ "name": "Apply Low Pass Filter",
+ "type": "boolean",
+ "description": "Specifies whether to apply a low pass filter on the analytics results"
+ },
+ "seasonalFilter": {
+ "name": "Apply Seasonal Filter",
+ "type": "boolean",
+ "description": "Specified whether to apply a seasonal cycle filter on the analytics results"
+ }
+}
+
+
+class NexusInitializerWrapper:
+ def __init__(self, clazz):
+ self.__log = logging.getLogger(__name__)
+ self.__hasBeenRun = False
+ self.__clazz = clazz
+ self.validate()
+
+ def validate(self):
+ if "init" not in self.__clazz.__dict__ or not type(self.__clazz.__dict__["init"]) == types.FunctionType:
+ raise Exception("Method 'init' has not been declared")
+
+ def clazz(self):
+ return self.__clazz
+
+ def hasBeenRun(self):
+ return self.__hasBeenRun
+
+ def init(self, config):
+ if not self.__hasBeenRun:
+ self.__hasBeenRun = True
+ instance = self.__clazz()
+ instance.init(config)
+ else:
+ self.log("Initializer '%s' has already been run" % self.__clazz)
+
+
+class AlgorithmModuleWrapper:
+ def __init__(self, clazz):
+ self.__instance = None
+ self.__clazz = clazz
+ self.validate()
+
+ def validate(self):
+ if "calc" not in self.__clazz.__dict__ or not type(self.__clazz.__dict__["calc"]) == types.FunctionType:
+ raise Exception("Method 'calc' has not been declared")
+
+ if "path" not in self.__clazz.__dict__:
+ raise Exception("Property 'path' has not been defined")
+
+ if "name" not in self.__clazz.__dict__:
+ raise Exception("Property 'name' has not been defined")
+
+ if "description" not in self.__clazz.__dict__:
+ raise Exception("Property 'description' has not been defined")
+
+ if "params" not in self.__clazz.__dict__:
+ raise Exception("Property 'params' has not been defined")
+
+ def clazz(self):
+ return self.__clazz
+
+ def name(self):
+ return self.__clazz.name
+
+ def path(self):
+ return self.__clazz.path
+
+ def description(self):
+ return self.__clazz.description
+
+ def params(self):
+ return self.__clazz.params
+
+ def instance(self, algorithm_config=None, sc=None):
+ if "singleton" in self.__clazz.__dict__ and self.__clazz.__dict__["singleton"] is True:
+ if self.__instance is None:
+ self.__instance = self.__clazz()
+
+ try:
+ self.__instance.set_config(algorithm_config)
+ except AttributeError:
+ pass
+
+ try:
+ self.__instance.set_spark_context(sc)
+ except AttributeError:
+ pass
+
+ return self.__instance
+ else:
+ instance = self.__clazz()
+
+ try:
+ instance.set_config(algorithm_config)
+ except AttributeError:
+ pass
+
+ try:
+ self.__instance.set_spark_context(sc)
+ except AttributeError:
+ pass
+ return instance
+
+ def isValid(self):
+ try:
+ self.validate()
+ return True
+ except Exception as ex:
+ return False
+
+
+class CalcHandler(object):
+ def calc(self, computeOptions, **args):
+ raise Exception("calc() not yet implemented")
+
+
+class NexusHandler(CalcHandler):
+ def __init__(self, skipCassandra=False, skipSolr=False):
+ CalcHandler.__init__(self)
+
+ self.algorithm_config = None
+ self._tile_service = NexusTileService(skipCassandra, skipSolr)
+
+ def set_config(self, algorithm_config):
+ self.algorithm_config = algorithm_config
+
+ def _mergeDicts(self, x, y):
+ z = x.copy()
+ z.update(y)
+ return z
+
+ def _now(self):
+ millis = int(round(time.time() * 1000))
+ return millis
+
+ def _mergeDataSeries(self, resultsData, dataNum, resultsMap):
+
+ for entry in resultsData:
+
+ #frmtdTime = datetime.fromtimestamp(entry["time"] ).strftime("%Y-%m")
+ frmtdTime = entry["time"]
+
+ if not frmtdTime in resultsMap:
+ resultsMap[frmtdTime] = []
+ entry["ds"] = dataNum
+ resultsMap[frmtdTime].append(entry)
+
+ def _resultsMapToList(self, resultsMap):
+ resultsList = []
+ for key, value in resultsMap.iteritems():
+ resultsList.append(value)
+
+ resultsList = sorted(resultsList, key=lambda entry: entry[0]["time"])
+ return resultsList
+
+ def _mergeResults(self, resultsRaw):
+ resultsMap = {}
+
+ for i in range(0, len(resultsRaw)):
+ resultsSeries = resultsRaw[i]
+ resultsData = resultsSeries[0]
+ self._mergeDataSeries(resultsData, i, resultsMap)
+
+ resultsList = self._resultsMapToList(resultsMap)
+ return resultsList
+
+
+class SparkHandler(NexusHandler):
+ class SparkJobContext(object):
+
+ class MaxConcurrentJobsReached(Exception):
+ def __init__(self, *args, **kwargs):
+ Exception.__init__(self, *args, **kwargs)
+
+ def __init__(self, job_stack):
+ self.spark_job_stack = job_stack
+ self.job_name = None
+ self.log = logging.getLogger(__name__)
+
+ def __enter__(self):
+ try:
+ self.job_name = self.spark_job_stack.pop()
+ self.log.debug("Using %s" % self.job_name)
+ except IndexError:
+ raise SparkHandler.SparkJobContext.MaxConcurrentJobsReached()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self.job_name is not None:
+ self.log.debug("Returning %s" % self.job_name)
+ self.spark_job_stack.append(self.job_name)
+
+ def __init__(self, **kwargs):
+ import inspect
+ NexusHandler.__init__(self, **kwargs)
+ self._sc = None
+
+ self.spark_job_stack = []
+
+ def with_spark_job_context(calc_func):
+ from functools import wraps
+
+ @wraps(calc_func)
+ def wrapped(*args, **kwargs1):
+ try:
+ with SparkHandler.SparkJobContext(self.spark_job_stack) as job_context:
+ # TODO Pool and Job are forced to a 1-to-1 relationship
+ calc_func.im_self._sc.setLocalProperty("spark.scheduler.pool", job_context.job_name)
+ calc_func.im_self._sc.setJobGroup(job_context.job_name, "a spark job")
+ return calc_func(*args, **kwargs1)
+ except SparkHandler.SparkJobContext.MaxConcurrentJobsReached:
+ raise NexusProcessingException(code=503,
+ reason="Max concurrent requests reached. Please try again later.")
+
+ return wrapped
+
+ for member in inspect.getmembers(self, predicate=inspect.ismethod):
+ if member[0] == "calc":
+ setattr(self, member[0], with_spark_job_context(member[1]))
+
+ def set_spark_context(self, sc):
+ self._sc = sc
+
+ def set_config(self, algorithm_config):
+ max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section(
+ "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10
+ self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)])
+ self.algorithm_config = algorithm_config
+
+ def _setQueryParams(self, ds, bounds, start_time=None, end_time=None,
+ start_year=None, end_year=None, clim_month=None,
+ fill=-9999., spark_master=None, spark_nexecs=None,
+ spark_nparts=None):
+ self._ds = ds
+ self._minLat, self._maxLat, self._minLon, self._maxLon = bounds
+ self._startTime = start_time
+ self._endTime = end_time
+ self._startYear = start_year
+ self._endYear = end_year
+ self._climMonth = clim_month
+ self._fill = fill
+ self._spark_master = spark_master
+ self._spark_nexecs = spark_nexecs
+ self._spark_nparts = spark_nparts
+
+ def _find_global_tile_set(self):
+ if type(self._ds) in (list,tuple):
+ ds = self._ds[0]
+ else:
+ ds = self._ds
+ ntiles = 0
+ ##################################################################
+ # Temporary workaround until we have dataset metadata to indicate
+ # temporal resolution.
+ if "monthly" in ds.lower():
+ t_incr = 2592000 # 30 days
+ else:
+ t_incr = 86400 # 1 day
+ ##################################################################
+ t = self._endTime
+ self._latRes = None
+ self._lonRes = None
+ while ntiles == 0:
+ nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=ds, start_time=t-t_incr, end_time=t)
+ ntiles = len(nexus_tiles)
+ self.log.debug('find_global_tile_set got {0} tiles'.format(ntiles))
+ if ntiles > 0:
+ for tile in nexus_tiles:
+ self.log.debug('tile coords:')
+ self.log.debug('tile lats: {0}'.format(tile.latitudes))
+ self.log.debug('tile lons: {0}'.format(tile.longitudes))
+ if self._latRes is None:
+ lats = tile.latitudes.data
+ if (len(lats) > 1):
+ self._latRes = abs(lats[1]-lats[0])
+ if self._lonRes is None:
+ lons = tile.longitudes.data
+ if (len(lons) > 1):
+ self._lonRes = abs(lons[1]-lons[0])
+ if ((self._latRes is not None) and
+ (self._lonRes is not None)):
+ break
+ if (self._latRes is None) or (self._lonRes is None):
+ ntiles = 0
+ else:
+ lats_agg = np.concatenate([tile.latitudes.compressed()
+ for tile in nexus_tiles])
+ lons_agg = np.concatenate([tile.longitudes.compressed()
+ for tile in nexus_tiles])
+ self._minLatCent = np.min(lats_agg)
+ self._maxLatCent = np.max(lats_agg)
+ self._minLonCent = np.min(lons_agg)
+ self._maxLonCent = np.max(lons_agg)
+ t -= t_incr
+ return nexus_tiles
+
+ def _find_tile_bounds(self, t):
+ lats = t.latitudes
+ lons = t.longitudes
+ if (len(lats.compressed()) > 0) and (len(lons.compressed()) > 0):
+ min_lat = np.ma.min(lats)
+ max_lat = np.ma.max(lats)
+ min_lon = np.ma.min(lons)
+ max_lon = np.ma.max(lons)
+ good_inds_lat = np.where(lats.mask == False)[0]
+ good_inds_lon = np.where(lons.mask == False)[0]
+ min_y = np.min(good_inds_lat)
+ max_y = np.max(good_inds_lat)
+ min_x = np.min(good_inds_lon)
+ max_x = np.max(good_inds_lon)
+ bounds = (min_lat, max_lat, min_lon, max_lon,
+ min_y, max_y, min_x, max_x)
+ else:
+ self.log.warn('Nothing in this tile!')
+ bounds = None
+ return bounds
+
+ @staticmethod
+ def query_by_parts(tile_service, min_lat, max_lat, min_lon, max_lon,
+ dataset, start_time, end_time, part_dim=0):
+ nexus_max_tiles_per_query = 100
+ #print 'trying query: ',min_lat, max_lat, min_lon, max_lon, \
+ # dataset, start_time, end_time
+ try:
+ tiles = \
+ tile_service.find_tiles_in_box(min_lat, max_lat,
+ min_lon, max_lon,
+ dataset,
+ start_time=start_time,
+ end_time=end_time,
+ fetch_data=False)
+ assert(len(tiles) <= nexus_max_tiles_per_query)
+ except:
+ #print 'failed query: ',min_lat, max_lat, min_lon, max_lon, \
+ # dataset, start_time, end_time
+ if part_dim == 0:
+ # Partition by latitude.
+ mid_lat = (min_lat + max_lat) / 2
+ nexus_tiles = SparkHandler.query_by_parts(tile_service,
+ min_lat, mid_lat,
+ min_lon, max_lon,
+ dataset,
+ start_time, end_time,
+ part_dim=part_dim)
+ nexus_tiles.extend(SparkHandler.query_by_parts(tile_service,
+ mid_lat,
+ max_lat,
+ min_lon,
+ max_lon,
+ dataset,
+ start_time,
+ end_time,
+ part_dim=part_dim))
+ elif part_dim == 1:
+ # Partition by longitude.
+ mid_lon = (min_lon + max_lon) / 2
+ nexus_tiles = SparkHandler.query_by_parts(tile_service,
+ min_lat, max_lat,
+ min_lon, mid_lon,
+ dataset,
+ start_time, end_time,
+ part_dim=part_dim)
+ nexus_tiles.extend(SparkHandler.query_by_parts(tile_service,
+ min_lat,
+ max_lat,
+ mid_lon,
+ max_lon,
+ dataset,
+ start_time,
+ end_time,
+ part_dim=part_dim))
+ elif part_dim == 2:
+ # Partition by time.
+ mid_time = (start_time + end_time) / 2
+ nexus_tiles = SparkHandler.query_by_parts(tile_service,
+ min_lat, max_lat,
+ min_lon, max_lon,
+ dataset,
+ start_time, mid_time,
+ part_dim=part_dim)
+ nexus_tiles.extend(SparkHandler.query_by_parts(tile_service,
+ min_lat,
+ max_lat,
+ min_lon,
+ max_lon,
+ dataset,
+ mid_time,
+ end_time,
+ part_dim=part_dim))
+ else:
+ # No exception, so query Cassandra for the tile data.
+ #print 'Making NEXUS query to Cassandra for %d tiles...' % \
+ # len(tiles)
+ #t1 = time.time()
+ #print 'NEXUS call start at time %f' % t1
+ #sys.stdout.flush()
+ nexus_tiles = list(tile_service.fetch_data_for_tiles(*tiles))
+ nexus_tiles = list(tile_service.mask_tiles_to_bbox(min_lat, max_lat,
+ min_lon, max_lon,
+ nexus_tiles))
+ #t2 = time.time()
+ #print 'NEXUS call end at time %f' % t2
+ #print 'Seconds in NEXUS call: ', t2-t1
+ #sys.stdout.flush()
+
+ #print 'Returning %d tiles' % len(nexus_tiles)
+ return nexus_tiles
+
+ @staticmethod
+ def _prune_tiles(nexus_tiles):
+ del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0]
+ for i in np.flipud(del_ind):
+ del nexus_tiles[i]
+
+ def _lat2ind(self,lat):
+ return int((lat-self._minLatCent)/self._latRes)
+
+ def _lon2ind(self,lon):
+ return int((lon-self._minLonCent)/self._lonRes)
+
+ def _ind2lat(self,y):
+ return self._minLatCent+y*self._latRes
+
+ def _ind2lon(self,x):
+ return self._minLonCent+x*self._lonRes
+
+ def _create_nc_file_time1d(self, a, fname, varname, varunits=None,
+ fill=None):
+ self.log.debug('a={0}'.format(a))
+ self.log.debug('shape a = {0}'.format(a.shape))
+ assert len(a.shape) == 1
+ time_dim = len(a)
+ rootgrp = Dataset(fname, "w", format="NETCDF4")
+ rootgrp.createDimension("time", time_dim)
+ vals = rootgrp.createVariable(varname, "f4", dimensions=("time",),
+ fill_value=fill)
+ times = rootgrp.createVariable("time", "f4", dimensions=("time",))
+ vals[:] = [d['mean'] for d in a]
+ times[:] = [d['time'] for d in a]
+ if varunits is not None:
+ vals.units = varunits
+ times.units = 'seconds since 1970-01-01 00:00:00'
+ rootgrp.close()
+
+ def _create_nc_file_latlon2d(self, a, fname, varname, varunits=None,
+ fill=None):
+ self.log.debug('a={0}'.format(a))
+ self.log.debug('shape a = {0}'.format(a.shape))
+ assert len(a.shape) == 2
+ lat_dim, lon_dim = a.shape
+ rootgrp = Dataset(fname, "w", format="NETCDF4")
+ rootgrp.createDimension("lat", lat_dim)
+ rootgrp.createDimension("lon", lon_dim)
+ vals = rootgrp.createVariable(varname, "f4",
+ dimensions=("lat","lon",),
+ fill_value=fill)
+ lats = rootgrp.createVariable("lat", "f4", dimensions=("lat",))
+ lons = rootgrp.createVariable("lon", "f4", dimensions=("lon",))
+ vals[:,:] = a
+ lats[:] = np.linspace(self._minLatCent,
+ self._maxLatCent, lat_dim)
+ lons[:] = np.linspace(self._minLonCent,
+ self._maxLonCent, lon_dim)
+ if varunits is not None:
+ vals.units = varunits
+ lats.units = "degrees north"
+ lons.units = "degrees east"
+ rootgrp.close()
+
+ def _create_nc_file(self, a, fname, varname, **kwargs):
+ self._create_nc_file_latlon2d(a, fname, varname, **kwargs)
+
+
+def executeInitializers(config):
+ [wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/__init__.py b/analysis/webservice/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/Capabilities.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py
new file mode 100644
index 0000000..0477e67
--- /dev/null
+++ b/analysis/webservice/algorithms/Capabilities.py
@@ -0,0 +1,43 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import json
+
+from webservice.NexusHandler import CalcHandler, nexus_handler, AVAILABLE_HANDLERS
+from webservice.webmodel import NexusResults
+
+
+@nexus_handler
+class CapabilitiesListHandlerImpl(CalcHandler):
+ name = "Capabilities"
+ path = "/capabilities"
+ description = "Lists the current capabilities of this Nexus system"
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ CalcHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ capabilities = []
+
+ for capability in AVAILABLE_HANDLERS:
+ capabilityDef = {
+ "name": capability.name(),
+ "path": capability.path(),
+ "description": capability.description(),
+ "parameters": capability.params()
+ }
+ capabilities.append(capabilityDef)
+
+ return CapabilitiesResults(capabilities)
+
+
+class CapabilitiesResults(NexusResults):
+ def __init__(self, capabilities):
+ NexusResults.__init__(self)
+ self.__capabilities = capabilities
+
+ def toJson(self):
+ return json.dumps(self.__capabilities, indent=4)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/CorrelationMap.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py
new file mode 100644
index 0000000..5064a7f
--- /dev/null
+++ b/analysis/webservice/algorithms/CorrelationMap.py
@@ -0,0 +1,129 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import json
+import math
+import numpy as np
+from shapely.geometry import box
+from scipy.stats import linregress
+from itertools import groupby
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults
+from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
+
+
+@nexus_handler
+class LongitudeLatitudeMapHandlerImpl(NexusHandler):
+ name = "Correlation Map"
+ path = "/correlationMap"
+ description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ params["res"] = {
+ "name": "Resolution",
+ "type": "float",
+ "description": "The resolution of the resulting correlation map"
+ }
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ minLat = computeOptions.get_min_lat()
+ maxLat = computeOptions.get_max_lat()
+ minLon = computeOptions.get_min_lon()
+ maxLon = computeOptions.get_max_lon()
+ ds = computeOptions.get_dataset()
+ startTime = computeOptions.get_start_time()
+ endTime = computeOptions.get_end_time()
+ resolution = computeOptions.get_decimal_arg("res", default=1.0)
+
+ if not len(ds) == 2:
+ raise Exception("Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2")
+
+ ds1tiles = self._tile_service.find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[0], startTime,
+ endTime)
+ ds2tiles = self._tile_service.find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[1], startTime,
+ endTime)
+
+ matches = self._match_tiles(ds1tiles, ds2tiles)
+
+ if len(matches) == 0:
+ raise NexusProcessingException(reason="Could not find any data temporally co-located")
+
+ results = [[{
+ 'cnt': 0,
+ 'slope': 0,
+ 'intercept': 0,
+ 'r': 0,
+ 'p': 0,
+ 'stderr': 0,
+ 'lat': float(lat),
+ 'lon': float(lon)
+ } for lon in np.arange(minLon, maxLon, resolution)] for lat in
+ np.arange(minLat, maxLat, resolution)]
+
+ for stats in results:
+ for stat in stats:
+ values_x = []
+ values_y = []
+ for tile_matches in matches:
+
+ tile_1_list = tile_matches[0]
+ value_1 = get_approximate_value_for_lat_lon(tile_1_list, stat["lat"], stat["lon"])
+
+ tile_2_list = tile_matches[1]
+ value_2 = get_approximate_value_for_lat_lon(tile_2_list, stat["lat"], stat["lon"])
+
+ if not (math.isnan(value_1) or math.isnan(value_2)):
+ values_x.append(value_1)
+ values_y.append(value_2)
+
+ if len(values_x) > 2 and len(values_y) > 2:
+ stats = linregress(values_x, values_y)
+
+ stat["slope"] = stats[0] if not math.isnan(stats[0]) and not math.isinf(stats[0]) else str(stats[0])
+ stat["intercept"] = stats[1] if not math.isnan(stats[1]) and not math.isinf(stats[1]) else str(
+ stats[1])
+ stat["r"] = stats[2] if not math.isnan(stats[2]) and not math.isinf(stats[2]) else str(stats[2])
+ stat["p"] = stats[3] if not math.isnan(stats[3]) and not math.isinf(stats[3]) else str(stats[3])
+ stat["stderr"] = stats[4] if not math.isnan(stats[4]) and not math.isinf(stats[4]) else str(
+ stats[4])
+ stat["cnt"] = len(values_x)
+
+ return CorrelationResults(results)
+
+ def _match_tiles(self, tiles_1, tiles_2):
+
+ date_map = {}
+
+ tiles_1 = sorted(tiles_1, key=lambda tile: tile.min_time)
+ for tile_start_time, tiles in groupby(tiles_1, lambda tile: tile.min_time):
+ if not tile_start_time in date_map:
+ date_map[tile_start_time] = []
+ date_map[tile_start_time].append(list(tiles))
+
+ tiles_2 = sorted(tiles_2, key=lambda tile: tile.min_time)
+ for tile_start_time, tiles in groupby(tiles_2, lambda tile: tile.min_time):
+ if not tile_start_time in date_map:
+ date_map[tile_start_time] = []
+ date_map[tile_start_time].append(list(tiles))
+
+ matches = [tiles for a_time, tiles in date_map.iteritems() if len(tiles) == 2]
+
+ return matches
+
+
+class CorrelationResults(NexusResults):
+ def __init__(self, results):
+ NexusResults.__init__(self)
+ self.results = results
+
+ def toJson(self):
+ json_d = {
+ "stats": {},
+ "meta": [None, None],
+ "data": self.results
+ }
+ return json.dumps(json_d, indent=4)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DailyDifferenceAverage.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py
new file mode 100644
index 0000000..1518c9e
--- /dev/null
+++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py
@@ -0,0 +1,230 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import sys
+import traceback
+from datetime import datetime, timedelta
+from multiprocessing.dummy import Pool, Manager
+from shapely.geometry import box
+
+import numpy as np
+import pytz
+from nexustiles.nexustiles import NexusTileService, NexusTileServiceException
+
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+SENTINEL = 'STOP'
+
+
+@nexus_handler
+class DailyDifferenceAverageImpl(NexusHandler):
+ name = "Daily Difference Average"
+ path = "/dailydifferenceaverage"
+ description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day."
+ params = {
+ "ds1": {
+ "name": "Dataset 1",
+ "type": "string",
+ "description": "The first Dataset shortname to use in calculation"
+ },
+ "ds2": {
+ "name": "Dataset 2",
+ "type": "string",
+ "description": "The second Dataset shortname to use in calculation"
+ },
+ "minLon": {
+ "name": "Minimum Longitude",
+ "type": "float",
+ "description": "Minimum (Western) bounding box Longitude"
+ },
+ "minLat": {
+ "name": "Minimum Latitude",
+ "type": "float",
+ "description": "Minimum (Southern) bounding box Latitude"
+ },
+ "maxLon": {
+ "name": "Maximum Longitude",
+ "type": "float",
+ "description": "Maximum (Eastern) bounding box Longitude"
+ },
+ "maxLat": {
+ "name": "Maximum Latitude",
+ "type": "float",
+ "description": "Maximum (Northern) bounding box Latitude"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "long integer",
+ "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "long integer",
+ "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=True)
+
+ def calc(self, request, **args):
+ min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon()
+ dataset1 = request.get_argument("ds1", None)
+ dataset2 = request.get_argument("ds2", None)
+ start_time = request.get_start_time()
+ end_time = request.get_end_time()
+
+ simple = request.get_argument("simple", None) is not None
+
+ averagebyday = self.get_daily_difference_average_for_box(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
+ start_time, end_time)
+
+ averagebyday = sorted(averagebyday, key=lambda dayavg: dayavg[0])
+
+ if simple:
+
+ import matplotlib.pyplot as plt
+ from matplotlib.dates import date2num
+
+ times = [date2num(self.date_from_ms(dayavg[0])) for dayavg in averagebyday]
+ means = [dayavg[1] for dayavg in averagebyday]
+ plt.plot_date(times, means, ls='solid')
+
+ plt.xlabel('Date')
+ plt.xticks(rotation=70)
+ plt.ylabel(u'Difference from 5-Day mean (\u00B0C)')
+ plt.title('Sea Surface Temperature (SST) Anomalies')
+ plt.grid(True)
+ plt.tight_layout()
+ plt.savefig("test.png")
+
+ return averagebyday, None, None
+ else:
+
+ result = NexusResults(
+ results=[[{'time': dayms, 'mean': avg, 'ds': 0}] for dayms, avg in averagebyday],
+ stats={},
+ meta=self.get_meta())
+
+ result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+ result.meta()['label'] = u'Difference from 5-Day mean (\u00B0C)'
+
+ return result
+
+ def date_from_ms(self, dayms):
+ base_datetime = datetime(1970, 1, 1)
+ delta = timedelta(0, 0, 0, dayms)
+ return base_datetime + delta
+
+ def get_meta(self):
+ meta = {
+ "title": "Sea Surface Temperature (SST) Anomalies",
+ "description": "SST anomolies are departures from the 5-day pixel mean",
+ "units": u'\u00B0C',
+ }
+ return meta
+
+ def get_daily_difference_average_for_box(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
+ start_time,
+ end_time):
+
+ daysinrange = self._tile_service.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset1,
+ start_time, end_time)
+
+ maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+ if maxprocesses == 1:
+ calculator = DailyDifferenceAverageCalculator()
+ averagebyday = []
+ for dayinseconds in daysinrange:
+ result = calculator.calc_average_diff_on_day(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
+ dayinseconds)
+ averagebyday.append((result[0], result[1]))
+ else:
+ # Create a task to calc average difference for each day
+ manager = Manager()
+ work_queue = manager.Queue()
+ done_queue = manager.Queue()
+ for dayinseconds in daysinrange:
+ work_queue.put(
+ ('calc_average_diff_on_day', min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, dayinseconds))
+ [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+ # Start new processes to handle the work
+ pool = Pool(maxprocesses)
+ [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+ pool.close()
+
+ # Collect the results as [(day (in ms), average difference for that day)]
+ averagebyday = []
+ for i in xrange(0, len(daysinrange)):
+ result = done_queue.get()
+ if result[0] == 'error':
+ print >> sys.stderr, result[1]
+ raise NexusProcessingException(reason="Error calculating average by day.")
+ rdata = result
+ averagebyday.append((rdata[0], rdata[1]))
+
+ pool.terminate()
+ manager.shutdown()
+
+ return averagebyday
+
+
+class DailyDifferenceAverageCalculator(object):
+ def __init__(self):
+ self.__tile_service = NexusTileService()
+
+ def calc_average_diff_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, timeinseconds):
+
+ day_of_year = datetime.fromtimestamp(timeinseconds, pytz.utc).timetuple().tm_yday
+
+ ds1_nexus_tiles = self.__tile_service.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon,
+ dataset1,
+ timeinseconds)
+
+ # Initialize list of differences
+ differences = []
+ # For each ds1tile
+ for ds1_tile in ds1_nexus_tiles:
+ # Get tile for ds2 using bbox from ds1_tile and day ms
+ try:
+ ds2_tile = self.__tile_service.find_tile_by_polygon_and_most_recent_day_of_year(
+ box(ds1_tile.bbox.min_lon, ds1_tile.bbox.min_lat, ds1_tile.bbox.max_lon, ds1_tile.bbox.max_lat),
+ dataset2, day_of_year)[0]
+ # Subtract ds2 tile from ds1 tile
+ diff = np.subtract(ds1_tile.data, ds2_tile.data)
+ except NexusTileServiceException:
+ # This happens when there is data in ds1tile but all NaNs in ds2tile because the
+ # Solr query being used filters out results where stats_count = 0.
+ # Technically, this should never happen if ds2 is a climatology generated in part from ds1
+ # and it is probably a data error
+
+ # For now, just treat ds2 as an array of all masked data (which essentially discards the ds1 data)
+ ds2_tile = np.ma.masked_all(ds1_tile.data.shape)
+ diff = np.subtract(ds1_tile.data, ds2_tile)
+
+ # Put results in list of differences
+ differences.append(np.ma.array(diff).ravel())
+
+ # Average List of differences
+ diffaverage = np.ma.mean(differences).item()
+ # Return Average by day
+ return int(timeinseconds), diffaverage
+
+
+def pool_worker(work_queue, done_queue):
+ try:
+ calculator = DailyDifferenceAverageCalculator()
+
+ for work in iter(work_queue.get, SENTINEL):
+ scifunction = work[0]
+ args = work[1:]
+ result = calculator.__getattribute__(scifunction)(*args)
+ done_queue.put(result)
+ except Exception as e:
+ e_str = traceback.format_exc(e)
+ done_queue.put(('error', e_str))
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DataInBoundsSearch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py
new file mode 100644
index 0000000..f9aa609
--- /dev/null
+++ b/analysis/webservice/algorithms/DataInBoundsSearch.py
@@ -0,0 +1,205 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import logging
+from datetime import datetime
+from pytz import timezone
+
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class DataInBoundsSearchHandlerImpl(NexusHandler):
+ name = "Data In-Bounds Search"
+ path = "/datainbounds"
+ description = "Fetches point values for a given dataset and geographical area"
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "string",
+ "description": "The Dataset shortname to use in calculation. Required"
+ },
+ "parameter": {
+ "name": "Parameter",
+ "type": "string",
+ "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ try:
+ ds = request.get_dataset()[0]
+ except:
+ raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+ parameter_s = request.get_argument('parameter', None)
+ if parameter_s not in ['sst', 'sss', 'wind', None]:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ start_time = long((start_time - EPOCH).total_seconds())
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ end_time = long((end_time - EPOCH).total_seconds())
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ return ds, parameter_s, start_time, end_time, bounding_polygon
+
+ def calc(self, computeOptions, **args):
+ ds, parameter, start_time, end_time, bounding_polygon = self.parse_arguments(computeOptions)
+
+ includemeta = computeOptions.get_include_meta()
+
+ min_lat = bounding_polygon.bounds[1]
+ max_lat = bounding_polygon.bounds[3]
+ min_lon = bounding_polygon.bounds[0]
+ max_lon = bounding_polygon.bounds[2]
+
+ tiles = self._tile_service.get_tiles_bounded_by_box(min_lat, max_lat, min_lon, max_lon, ds, start_time,
+ end_time)
+
+ data = []
+ for tile in tiles:
+ for nexus_point in tile.nexus_point_generator():
+
+ point = dict()
+ point['id'] = tile.tile_id
+
+ if parameter == 'sst':
+ point['sst'] = nexus_point.data_val
+ elif parameter == 'sss':
+ point['sss'] = nexus_point.data_val
+ elif parameter == 'wind':
+ point['wind_u'] = nexus_point.data_val
+ try:
+ point['wind_v'] = tile.meta_data['wind_v'][tuple(nexus_point.index)]
+ except (KeyError, IndexError):
+ pass
+ try:
+ point['wind_direction'] = tile.meta_data['wind_dir'][tuple(nexus_point.index)]
+ except (KeyError, IndexError):
+ pass
+ try:
+ point['wind_speed'] = tile.meta_data['wind_speed'][tuple(nexus_point.index)]
+ except (KeyError, IndexError):
+ pass
+ else:
+ pass
+
+ data.append({
+ 'latitude': nexus_point.latitude,
+ 'longitude': nexus_point.longitude,
+ 'time': nexus_point.time,
+ 'data': [
+ point
+ ]
+ })
+
+ if includemeta and len(tiles) > 0:
+ meta = [tile.get_summary() for tile in tiles]
+ else:
+ meta = None
+
+ result = DataInBoundsResult(
+ results=data,
+ stats={},
+ meta=meta)
+
+ result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+
+ return result
+
+
+class DataInBoundsResult(NexusResults):
+ def toCSV(self):
+ rows = []
+
+ headers = [
+ "id",
+ "lon",
+ "lat",
+ "time"
+ ]
+
+ for i, result in enumerate(self.results()):
+ cols = []
+
+ cols.append(str(result['data'][0]['id']))
+ cols.append(str(result['longitude']))
+ cols.append(str(result['latitude']))
+ cols.append(datetime.utcfromtimestamp(result["time"]).strftime('%Y-%m-%dT%H:%M:%SZ'))
+ if 'sst' in result['data'][0]:
+ cols.append(str(result['data'][0]['sst']))
+ if i == 0:
+ headers.append("sea_water_temperature")
+ elif 'sss' in result['data'][0]:
+ cols.append(str(result['data'][0]['sss']))
+ if i == 0:
+ headers.append("sea_water_salinity")
+ elif 'wind_u' in result['data'][0]:
+ cols.append(str(result['data'][0]['wind_u']))
+ cols.append(str(result['data'][0]['wind_v']))
+ cols.append(str(result['data'][0]['wind_direction']))
+ cols.append(str(result['data'][0]['wind_speed']))
+ if i == 0:
+ headers.append("eastward_wind")
+ headers.append("northward_wind")
+ headers.append("wind_direction")
+ headers.append("wind_speed")
+
+ if i == 0:
+ rows.append(",".join(headers))
+ rows.append(",".join(cols))
+
+ return "\r\n".join(rows)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DataSeriesList.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py
new file mode 100644
index 0000000..399742a
--- /dev/null
+++ b/analysis/webservice/algorithms/DataSeriesList.py
@@ -0,0 +1,30 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import json
+from webservice.NexusHandler import NexusHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import cached
+
+
+@nexus_handler
+class DataSeriesListHandlerImpl(NexusHandler):
+ name = "Dataset List"
+ path = "/list"
+ description = "Lists datasets currently available for analysis"
+ params = {}
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=True)
+
+ @cached(ttl=(60 * 60 * 1000)) # 1 hour cached
+ def calc(self, computeOptions, **args):
+ class SimpleResult(object):
+ def __init__(self, result):
+ self.result = result
+
+ def toJson(self):
+ return json.dumps(self.result)
+
+ return SimpleResult(self._tile_service.get_dataseries_list())
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DelayTest.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py
new file mode 100644
index 0000000..85bdb8c
--- /dev/null
+++ b/analysis/webservice/algorithms/DelayTest.py
@@ -0,0 +1,29 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import time
+
+from webservice.NexusHandler import CalcHandler
+from webservice.NexusHandler import nexus_handler
+
+
+@nexus_handler
+class DelayHandlerImpl(CalcHandler):
+ name = "Delay"
+ path = "/delay"
+ description = "Waits a little while"
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ CalcHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ time.sleep(10)
+
+ class SimpleResult(object):
+ def toJson(self):
+ return ""
+
+ return SimpleResult()
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/ErrorTosserTest.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py
new file mode 100644
index 0000000..3f791d5
--- /dev/null
+++ b/analysis/webservice/algorithms/ErrorTosserTest.py
@@ -0,0 +1,23 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+from webservice.NexusHandler import CalcHandler, nexus_handler
+
+
+@nexus_handler
+class ErrorTosserHandler(CalcHandler):
+ name = "MakeError"
+ path = "/makeerror"
+ description = "Causes an error"
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ CalcHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ a = 100 / 0.0
+ # raise Exception("I'm Mad!")
+ # raise NexusProcessingException.NexusProcessingException(NexusProcessingException.StandardNexusErrors.UNKNOWN, "I'm Mad!")
+ return {}, None, None
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/Heartbeat.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py
new file mode 100644
index 0000000..a462739
--- /dev/null
+++ b/analysis/webservice/algorithms/Heartbeat.py
@@ -0,0 +1,40 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import json
+
+from webservice.NexusHandler import NexusHandler, nexus_handler, AVAILABLE_HANDLERS
+from webservice.webmodel import NexusResults
+
+
+@nexus_handler
+class HeartbeatHandlerImpl(NexusHandler):
+ name = "Backend Services Status"
+ path = "/heartbeat"
+ description = "Returns health status of Nexus backend services"
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=True)
+
+ def calc(self, computeOptions, **args):
+ solrOnline = self._tile_service.pingSolr()
+
+ # Not sure how to best check cassandra cluster status so just return True for now
+ cassOnline = True
+
+ if solrOnline and cassOnline:
+ status = {"online": True}
+ else:
+ status = {"online": False}
+
+ class SimpleResult(object):
+ def __init__(self, result):
+ self.result = result
+
+ def toJson(self):
+ return json.dumps(self.result)
+
+ return SimpleResult(status)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/HofMoeller.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py
new file mode 100644
index 0000000..98cff8d
--- /dev/null
+++ b/analysis/webservice/algorithms/HofMoeller.py
@@ -0,0 +1,362 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import itertools
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.pyplot as plt
+import mpld3
+import numpy as np
+from matplotlib import cm
+from matplotlib.ticker import FuncFormatter
+
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults
+
+SENTINEL = 'STOP'
+LATITUDE = 0
+LONGITUDE = 1
+
+
+class LongitudeHofMoellerCalculator(object):
+ def longitude_time_hofmoeller_stats(self, tile, index):
+ stat = {
+ 'sequence': index,
+ 'time': np.ma.min(tile.times),
+ 'lons': []
+ }
+
+ points = list(tile.nexus_point_generator())
+ data = sorted(points, key=lambda p: p.longitude)
+ points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
+
+ for lon, points_at_lon in points_by_lon:
+ values_at_lon = np.array([point.data_val for point in points_at_lon])
+ stat['lons'].append({
+ 'longitude': float(lon),
+ 'cnt': len(values_at_lon),
+ 'avg': np.mean(values_at_lon).item(),
+ 'max': np.max(values_at_lon).item(),
+ 'min': np.min(values_at_lon).item(),
+ 'std': np.std(values_at_lon).item()
+ })
+
+ return stat
+
+
+class LatitudeHofMoellerCalculator(object):
+ def latitude_time_hofmoeller_stats(self, tile, index):
+ stat = {
+ 'sequence': index,
+ 'time': np.ma.min(tile.times),
+ 'lats': []
+ }
+
+ points = list(tile.nexus_point_generator())
+ data = sorted(points, key=lambda p: p.latitude)
+ points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
+
+ for lat, points_at_lat in points_by_lat:
+ values_at_lat = np.array([point.data_val for point in points_at_lat])
+
+ stat['lats'].append({
+ 'latitude': float(lat),
+ 'cnt': len(values_at_lat),
+ 'avg': np.mean(values_at_lat).item(),
+ 'max': np.max(values_at_lat).item(),
+ 'min': np.min(values_at_lat).item(),
+ 'std': np.std(values_at_lat).item()
+ })
+
+ return stat
+
+
+class BaseHoffMoellerHandlerImpl(NexusHandler):
+ def __init__(self):
+ NexusHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
+ shape = (len(results), len(results[0][pivot]))
+ if shape[0] <= 12:
+ return results
+ for a in range(0, 12):
+ values = []
+ for b in range(a, len(results), 12):
+ values.append(np.average([l[field] for l in results[b][pivot]]))
+ avg = np.average(values)
+
+ for b in range(a, len(results), 12):
+ for l in results[b][pivot]:
+ l["%sSeasonal" % field] = l[field] - avg
+
+ return results
+
+ def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True):
+ results = self.applyDeseasonToHofMoellerByField(results, pivot, field="avg", append=append)
+ results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append)
+ results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append)
+ return results
+
+
+@nexus_handler
+class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerHandlerImpl):
+ name = "Latitude/Time HofMoeller"
+ path = "/latitudeTimeHofMoeller"
+ description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ BaseHoffMoellerHandlerImpl.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ tiles = self._tile_service.get_tiles_bounded_by_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+ computeOptions.get_dataset()[0],
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time())
+
+ if len(tiles) == 0:
+ raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+ maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+ results = []
+ if maxprocesses == 1:
+ calculator = LatitudeHofMoellerCalculator()
+ for x, tile in enumerate(tiles):
+ result = calculator.latitude_time_hofmoeller_stats(tile, x)
+ results.append(result)
+ else:
+ manager = Manager()
+ work_queue = manager.Queue()
+ done_queue = manager.Queue()
+ for x, tile in enumerate(tiles):
+ work_queue.put(
+ ('latitude_time_hofmoeller_stats', tile, x))
+ [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+ # Start new processes to handle the work
+ pool = Pool(maxprocesses)
+ [pool.apply_async(pool_worker, (LATITUDE, work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+ pool.close()
+
+ # Collect the results
+ for x, tile in enumerate(tiles):
+ result = done_queue.get()
+ try:
+ error_str = result['error']
+ self.log.error(error_str)
+ raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.")
+ except KeyError:
+ pass
+
+ results.append(result)
+
+ pool.terminate()
+ manager.shutdown()
+
+ results = sorted(results, key=lambda entry: entry["time"])
+
+ results = self.applyDeseasonToHofMoeller(results)
+
+ result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+ return result
+
+
+@nexus_handler
+class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerHandlerImpl):
+ name = "Longitude/Time HofMoeller"
+ path = "/longitudeTimeHofMoeller"
+ description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ BaseHoffMoellerHandlerImpl.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ tiles = self._tile_service.get_tiles_bounded_by_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+ computeOptions.get_dataset()[0],
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time())
+
+ if len(tiles) == 0:
+ raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+ maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+ results = []
+ if maxprocesses == 1:
+ calculator = LongitudeHofMoellerCalculator()
+ for x, tile in enumerate(tiles):
+ result = calculator.longitude_time_hofmoeller_stats(tile, x)
+ results.append(result)
+ else:
+ manager = Manager()
+ work_queue = manager.Queue()
+ done_queue = manager.Queue()
+ for x, tile in enumerate(tiles):
+ work_queue.put(
+ ('longitude_time_hofmoeller_stats', tile, x))
+ [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+ # Start new processes to handle the work
+ pool = Pool(maxprocesses)
+ [pool.apply_async(pool_worker, (LONGITUDE, work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+ pool.close()
+
+ # Collect the results
+ for x, tile in enumerate(tiles):
+ result = done_queue.get()
+ try:
+ error_str = result['error']
+ self.log.error(error_str)
+ raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.")
+ except KeyError:
+ pass
+
+ results.append(result)
+
+ pool.terminate()
+ manager.shutdown()
+
+ results = sorted(results, key=lambda entry: entry["time"])
+
+ results = self.applyDeseasonToHofMoeller(results, pivot="lons")
+
+ result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+ return result
+
+
+class HoffMoellerResults(NexusResults):
+ LATITUDE = 0
+ LONGITUDE = 1
+
+ def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args):
+ NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+ self.__type = args['type']
+
+ def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
+ cmap = cm.coolwarm
+ # ls = LightSource(315, 45)
+ # rgb = ls.shade(data, cmap)
+
+ fig, ax = plt.subplots()
+ fig.set_size_inches(11.0, 8.5)
+ cax = ax.imshow(data, interpolation=interpolate, cmap=cmap)
+
+ def yFormatter(y, pos):
+ if y < len(coordSeries):
+ return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.)
+ else:
+ return ""
+
+ def xFormatter(x, pos):
+ if x < len(timeSeries):
+ return timeSeries[int(x)].strftime('%b %Y')
+ else:
+ return ""
+
+ ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
+ ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
+
+ ax.set_title(title)
+ ax.set_ylabel(coordName)
+ ax.set_xlabel('Date')
+
+ fig.colorbar(cax)
+ fig.autofmt_xdate()
+
+ labels = ['point {0}'.format(i + 1) for i in range(len(data))]
+ # plugins.connect(fig, plugins.MousePosition(fontsize=14))
+ tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+ def createLongitudeHoffmueller(self, res, meta):
+ lonSeries = [m['longitude'] for m in res[0]['lons']]
+ timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+ data = np.zeros((len(lonSeries), len(timeSeries)))
+
+ plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "avg"
+
+ for t in range(0, len(timeSeries)):
+ timeSet = res[t]
+ for l in range(0, len(lonSeries)):
+ latSet = timeSet['lons'][l]
+ value = latSet[plotSeries]
+ data[len(lonSeries) - l - 1][t] = value
+
+ title = meta['title']
+ source = meta['source']
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude",
+ "%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+ def createLatitudeHoffmueller(self, res, meta):
+ latSeries = [m['latitude'] for m in res[0]['lats']]
+ timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+ data = np.zeros((len(latSeries), len(timeSeries)))
+
+ plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "avg"
+
+ for t in range(0, len(timeSeries)):
+ timeSet = res[t]
+ for l in range(0, len(latSeries)):
+ latSet = timeSet['lats'][l]
+ value = latSet[plotSeries]
+ data[len(latSeries) - l - 1][t] = value
+
+ title = meta['title']
+ source = meta['source']
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ return self.createHoffmueller(data, latSeries, timeSeries, "Latitude",
+ title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+ def toImage(self):
+ res = self.results()
+ meta = self.meta()
+
+ if self.__type == HoffMoellerResults.LATITUDE:
+ return self.createLatitudeHoffmueller(res, meta)
+ elif self.__type == HoffMoellerResults.LONGITUDE:
+ return self.createLongitudeHoffmueller(res, meta)
+ else:
+ raise Exception("Unsupported HoffMoeller Plot Type")
+
+
+def pool_worker(type, work_queue, done_queue):
+ try:
+
+ if type == LATITUDE:
+ calculator = LatitudeHofMoellerCalculator()
+ elif type == LONGITUDE:
+ calculator = LongitudeHofMoellerCalculator()
+
+ for work in iter(work_queue.get, SENTINEL):
+ scifunction = work[0]
+ args = work[1:]
+ result = calculator.__getattribute__(scifunction)(*args)
+ done_queue.put(result)
+
+ except Exception as e:
+ e_str = traceback.format_exc(e)
+ done_queue.put({'error': e_str})
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/LongitudeLatitudeMap.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
new file mode 100644
index 0000000..be9123e
--- /dev/null
+++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
@@ -0,0 +1,249 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import logging
+import math
+from datetime import datetime
+
+from pytz import timezone
+from shapely.geometry import box
+
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+SENTINEL = 'STOP'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+tile_service = None
+
+
+@nexus_handler
+class LongitudeLatitudeMapHandlerImpl(NexusHandler):
+ name = "Longitude/Latitude Time Average Map"
+ path = "/longitudeLatitudeMap"
+ description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "string",
+ "description": "One or more comma-separated dataset shortnames"
+ },
+ "minLat": {
+ "name": "Minimum Latitude",
+ "type": "float",
+ "description": "Minimum (Southern) bounding box Latitude"
+ },
+ "maxLat": {
+ "name": "Maximum Latitude",
+ "type": "float",
+ "description": "Maximum (Northern) bounding box Latitude"
+ },
+ "minLon": {
+ "name": "Minimum Longitude",
+ "type": "float",
+ "description": "Minimum (Western) bounding box Longitude"
+ },
+ "maxLon": {
+ "name": "Maximum Longitude",
+ "type": "float",
+ "description": "Maximum (Eastern) bounding box Longitude"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ NexusHandler.__init__(self, skipCassandra=True)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+ try:
+ ds = request.get_dataset()[0]
+ except:
+ raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+ try:
+ bounding_polygon = box(request.get_min_lon(), request.get_min_lat(), request.get_max_lon(),
+ request.get_max_lat())
+ except:
+ raise NexusProcessingException(
+ reason="'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required.",
+ code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch
+
+ def calc(self, request, **args):
+
+ ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch = self.parse_arguments(request)
+
+ boxes = self._tile_service.get_distinct_bounding_boxes_in_polygon(bounding_polygon, ds,
+ start_seconds_from_epoch,
+ end_seconds_from_epoch)
+ point_avg_over_time = lat_lon_map_driver(bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, ds,
+ [a_box.bounds for a_box in boxes])
+
+ kwargs = {
+ "minLon": bounding_polygon.bounds[0],
+ "minLat": bounding_polygon.bounds[1],
+ "maxLon": bounding_polygon.bounds[2],
+ "maxLat": bounding_polygon.bounds[3],
+ "ds": ds,
+ "startTime": datetime.utcfromtimestamp(start_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ'),
+ "endTime": datetime.utcfromtimestamp(end_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ')
+ }
+ return LongitudeLatitudeMapResults(
+ results=LongitudeLatitudeMapHandlerImpl.results_to_dicts(point_avg_over_time), meta=None,
+ **kwargs)
+
+ @staticmethod
+ def results_to_dicts(results):
+
+ # ((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt))
+ return [{
+ 'lon': result[0][0],
+ 'lat': result[0][1],
+ 'slope': result[1][0] if not math.isnan(result[1][0]) else 'NaN',
+ 'intercept': result[1][1] if not math.isnan(result[1][1]) else 'NaN',
+ 'r': result[1][2],
+ 'p': result[1][3],
+ 'stderr': result[1][4] if not math.isinf(result[1][4]) else 'Inf',
+ 'avg': result[1][5],
+ 'max': result[1][6],
+ 'min': result[1][7],
+ 'std': result[1][8],
+ 'cnt': result[1][9],
+ } for result in results]
+
+
+def pool_initializer():
+ from nexustiles.nexustiles import NexusTileService
+ global tile_service
+ tile_service = NexusTileService()
+ # TODO This is a hack to make sure each sub-process uses it's own connection to cassandra. data-access needs to be updated
+ from cassandra.cqlengine import connection
+ from multiprocessing import current_process
+
+ connection.register_connection(current_process().name, [host.address for host in connection.get_session().hosts])
+ connection.set_default_connection(current_process().name)
+
+
+def lat_lon_map_driver(search_bounding_polygon, search_start, search_end, ds, distinct_boxes):
+ from functools import partial
+ from nexustiles.nexustiles import NexusTileService
+ # Start new processes to handle the work
+ # pool = Pool(5, pool_initializer)
+
+ func = partial(regression_on_tiles, search_bounding_polygon_wkt=search_bounding_polygon.wkt,
+ search_start=search_start, search_end=search_end, ds=ds)
+
+ global tile_service
+ tile_service = NexusTileService()
+ map_result = map(func, distinct_boxes)
+ return [item for sublist in map_result for item in sublist]
+ # TODO Use for multiprocessing:
+ # result = pool.map_async(func, distinct_boxes)
+ #
+ # pool.close()
+ # while not result.ready():
+ # print "Not ready"
+ # time.sleep(5)
+ #
+ # print result.successful()
+ # print result.get()[0]
+ # pool.join()
+ # pool.terminate()
+ #
+ # return [item for sublist in result.get() for item in sublist]
+
+
+def calc_linear_regression(arry, xarry):
+ from scipy.stats import linregress
+ slope, intercept, r_value, p_value, std_err = linregress(arry, xarry)
+ return slope, intercept, r_value, p_value, std_err
+
+
+def regression_on_tiles(tile_bounds, search_bounding_polygon_wkt, search_start, search_end, ds):
+ if len(tile_bounds) < 1:
+ return []
+
+ import numpy as np
+ import operator
+ from shapely import wkt
+ from shapely.geometry import box
+ search_bounding_shape = wkt.loads(search_bounding_polygon_wkt)
+ tile_bounding_shape = box(*tile_bounds)
+
+ # Load all tiles for given (exact) bounding box across the search time range
+ tiles = tile_service.find_tiles_by_exact_bounds(tile_bounds, ds, search_start, search_end)
+ if search_bounding_shape.contains(tile_bounding_shape):
+ # The tile bounds are totally contained in the search area, we don't need to mask it.
+ pass
+ else:
+ # The tile bounds cross the search area borders, we need to mask the tiles to the search area
+ tiles = tile_service.mask_tiles_to_polygon(wkt.loads(search_bounding_polygon_wkt), tiles)
+ # If all tiles end up being masked, there is no work to do
+ if len(tiles) < 1:
+ return []
+ tiles.sort(key=operator.attrgetter('min_time'))
+
+ stacked_tile_data = np.stack(tuple([np.squeeze(tile.data, 0) for tile in tiles]))
+ stacked_tile_lons = np.stack([tile.longitudes for tile in tiles])
+ stacked_tile_lats = np.stack([tile.latitudes for tile in tiles])
+
+ x_array = np.arange(stacked_tile_data.shape[0])
+ point_regressions = np.apply_along_axis(calc_linear_regression, 0, stacked_tile_data, x_array)
+ point_means = np.nanmean(stacked_tile_data, 0)
+ point_maximums = np.nanmax(stacked_tile_data, 0)
+ point_minimums = np.nanmin(stacked_tile_data, 0)
+ point_st_deviations = np.nanstd(stacked_tile_data, 0)
+ point_counts = np.ma.count(np.ma.masked_invalid(stacked_tile_data), 0)
+
+ results = []
+ for lat_idx, lon_idx in np.ndindex(point_means.shape):
+ lon, lat = np.max(stacked_tile_lons[:, lon_idx]), np.max(stacked_tile_lats[:, lat_idx])
+ pcnt = point_counts[lat_idx, lon_idx]
+
+ if pcnt == 0:
+ continue
+
+ mean = point_means[lat_idx, lon_idx]
+ pmax = point_maximums[lat_idx, lon_idx]
+ pmin = point_minimums[lat_idx, lon_idx]
+ pstd = point_st_deviations[lat_idx, lon_idx]
+ slope, intercept, r_value, p_value, std_err = point_regressions[:, lat_idx, lon_idx]
+
+ results.append(((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt)))
+
+ return results
+
+
+class LongitudeLatitudeMapResults(NexusResults):
+ def __init__(self, results=None, meta=None, computeOptions=None, **kwargs):
+ NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions, **kwargs)