You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:27 UTC

[50/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/tests/algorithms_spark/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py
new file mode 100644
index 0000000..bd9282c
--- /dev/null
+++ b/analysis/tests/algorithms_spark/__init__.py
@@ -0,0 +1,4 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/Filtering.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/Filtering.py b/analysis/webservice/Filtering.py
new file mode 100644
index 0000000..08b976e
--- /dev/null
+++ b/analysis/webservice/Filtering.py
@@ -0,0 +1,157 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+
+import math
+
+import logging
+import traceback
+
+import numpy as np
+from scipy import stats
+from scipy.fftpack import fft
+from scipy.ndimage.interpolation import zoom
+from scipy.interpolate import UnivariateSpline
+from scipy.signal import wiener, filtfilt, butter, gaussian, freqz
+from scipy.ndimage import filters
+
+log = logging.getLogger('Filtering')
+
+
+def __fieldToList(results, field):
+    a = np.zeros(len(results))
+    for n in range(0, len(results)):
+        a[n] = results[n][field]
+    return a
+
+
+def __listToField(results, l, field):
+    if results is None or l is None:
+        raise Exception("Cannot transpose values if they're null")
+
+    if not len(results) == len(l):
+        raise Exception("Cannot transpose values between lists of inequal length")
+
+    for n in range(0, len(results)):
+        results[n][field] = l[n]
+
+
+def applySeasonalCycleFilter1d(l):
+    if len(l) <= 12:
+        return l
+
+    for a in range(0, 12):
+        values = []
+        for b in range(a, len(l), 12):
+            values.append(l[b])
+        avg = np.average(values)
+        for b in range(a, len(l), 12):
+            l[b] -= avg
+    return l
+
+
+def applySeasonalCycleFilter2d(l):
+    return l
+
+
+'''
+    Implements monthly filtering of seasonal cycles.
+'''
+
+
+def applySeasonalCycleFilter(l):
+    if len(np.shape(l)) == 1:
+        return applySeasonalCycleFilter1d(l)
+    elif len(np.shape(l)) == 2:
+        return applySeasonalCycleFilter2d(l)
+    else:
+        raise Exception("Cannot apply seasonal cycle filter: Unsupported array shape")
+
+
+def applySeasonalCycleFilterOnResultsField(results, field):
+    l = __fieldToList(results, field)
+    applySeasonalCycleFilter(l)
+    __listToField(results, l, field)
+
+
+def applySeasonalCycleFilterOnResults(results):
+    [applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']]
+
+
+'''
+http://www.nehalemlabs.net/prototype/blog/2013/04/05/an-introduction-to-smoothing-time-series-in-python-part-i-filtering-theory/
+'''
+
+
+def applyLowPassFilter(y, lowcut=12.0, order=9.0):
+    if len(y) - 12 <= lowcut:
+        lowcut = 3
+    nyq = 0.5 * len(y)
+    low = lowcut / nyq
+    # high = highcut / nyq
+    b, a = butter(order, low)
+    m = min([len(y), len(a), len(b)])
+    padlen = 30 if m >= 30 else m
+    fl = filtfilt(b, a, y, padlen=padlen)
+    return fl
+
+
+def applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=False, append=""):
+    x = __fieldToList(results, field)
+
+    if applySeasonal:
+        x = applySeasonalCycleFilter(x)
+    if applyLowPass:
+        x = applyLowPassFilter(x)
+    __listToField(results, x, "%s%s" % (field, append))
+
+
+def applyAllFiltersOnField(results, field, applySeasonal=True, applyLowPass=True):
+    try:
+        if applySeasonal:
+            applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=False, append="Seasonal")
+    except Exception as e:
+        # If it doesn't work log the error but ignore it
+        tb = traceback.format_exc()
+        log.warn("Error calculating Seasonal filter:\n%s" % tb)
+
+    try:
+        if applyLowPass:
+            applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=True, append="LowPass")
+    except Exception as e:
+        # If it doesn't work log the error but ignore it
+        tb = traceback.format_exc()
+        log.warn("Error calculating LowPass filter:\n%s" % tb)
+
+    try:
+        if applySeasonal and applyLowPass:
+            applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=True, append="SeasonalLowPass")
+    except Exception as e:
+        # If it doesn't work log the error but ignore it
+        tb = traceback.format_exc()
+        log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
+
+
+'''
+class ResultsFilter(object):
+
+    def __init__(self):
+        pass
+
+    def filter(self, results, append, **kwargs):
+        pass
+
+
+
+class SeasonalCycleFilter(ResultsFilter):
+
+    def filter(self, results, append, **kwargs):
+        [applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']]
+
+if __name__ == "__main__":
+
+    foo = "bar"
+    f = ResultsFilter()
+    f.test("Tester", blah=foo)
+'''

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/LayerConfig.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/LayerConfig.py b/analysis/webservice/LayerConfig.py
new file mode 100644
index 0000000..e271b72
--- /dev/null
+++ b/analysis/webservice/LayerConfig.py
@@ -0,0 +1,61 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+
+
+ALL_LAYERS_ENABLED = True
+LAYERS = []
+LAYERS.append({"shortName":"NCDC-L4LRblend-GLOB-AVHRR_OI", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SSH_alti_1deg_1mon", "envs": ("ALL",)})
+
+
+LAYERS.append({"shortName":"SIacSubl_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"PHIBOT_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIhsnow_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIheff_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceFWflx_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceQnet_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"MXLDEPTH_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIatmQnt_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceSPflx_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceSPDep_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIarea_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"ETAN_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"sIceLoad_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"oceQsw_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SIsnPrcp_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"DETADT2_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"TFLUX_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SItflux_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"SFLUX_ECCO_version4_release1", "envs": ("ALL",)})
+LAYERS.append({"shortName":"TELLUS_GRACE_MASCON_GRID_RL05_V1_LAND", "envs": ("ALL",)})
+LAYERS.append({"shortName":"TELLUS_GRACE_MASCON_GRID_RL05_V1_OCEAN", "envs": ("ALL",)})
+LAYERS.append({"shortName":"Sea_Surface_Anomalies", "envs": ("DEV",)})
+
+LAYERS.append({"shortName":"JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1", "envs": ("ALL",)})
+
+
+def isLayerEnabled(shortName, env):
+    if ALL_LAYERS_ENABLED:
+        return True
+    if env == None:
+        env = "PROD"
+    env = env.upper()
+    if env == "DEV":
+        return True
+    for layer in LAYERS:
+        if layer["shortName"] == shortName and ("ALL" in layer["envs"] or env.upper() in layer["envs"]):
+            return True
+    return False
+
+
+if __name__ == "__main__":
+
+    print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", None)
+    print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", "PROD")
+    print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", "SIT")
+
+    print isLayerEnabled("TFLUX_ECCO_version4_release1", None)
+    print isLayerEnabled("TFLUX_ECCO_version4_release1", "PROD")
+    print isLayerEnabled("TFLUX_ECCO_version4_release1", "SIT")

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/NexusHandler.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/NexusHandler.py b/analysis/webservice/NexusHandler.py
new file mode 100644
index 0000000..1cad3cf
--- /dev/null
+++ b/analysis/webservice/NexusHandler.py
@@ -0,0 +1,554 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import sys
+import numpy as np
+import logging
+import time
+import types
+from datetime import datetime
+from netCDF4 import Dataset
+from nexustiles.nexustiles import NexusTileService
+from webservice.webmodel import NexusProcessingException
+
+AVAILABLE_HANDLERS = []
+AVAILABLE_INITIALIZERS = []
+
+
+def nexus_initializer(clazz):
+    log = logging.getLogger(__name__)
+    try:
+        wrapper = NexusInitializerWrapper(clazz)
+        log.info("Adding initializer '%s'" % wrapper.clazz())
+        AVAILABLE_INITIALIZERS.append(wrapper)
+    except Exception as ex:
+        log.warn("Initializer '%s' failed to load (reason: %s)" % (clazz, ex.message), exc_info=True)
+    return clazz
+
+
+def nexus_handler(clazz):
+    log = logging.getLogger(__name__)
+    try:
+        wrapper = AlgorithmModuleWrapper(clazz)
+        log.info("Adding algorithm module '%s' with path '%s' (%s)" % (wrapper.name(), wrapper.path(), wrapper.clazz()))
+        AVAILABLE_HANDLERS.append(wrapper)
+    except Exception as ex:
+        log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True)
+    return clazz
+
+
+DEFAULT_PARAMETERS_SPEC = {
+    "ds": {
+        "name": "Dataset",
+        "type": "string",
+        "description": "One or more comma-separated dataset shortnames"
+    },
+    "minLat": {
+        "name": "Minimum Latitude",
+        "type": "float",
+        "description": "Minimum (Southern) bounding box Latitude"
+    },
+    "maxLat": {
+        "name": "Maximum Latitude",
+        "type": "float",
+        "description": "Maximum (Northern) bounding box Latitude"
+    },
+    "minLon": {
+        "name": "Minimum Longitude",
+        "type": "float",
+        "description": "Minimum (Western) bounding box Longitude"
+    },
+    "maxLon": {
+        "name": "Maximum Longitude",
+        "type": "float",
+        "description": "Maximum (Eastern) bounding box Longitude"
+    },
+    "startTime": {
+        "name": "Start Time",
+        "type": "long integer",
+        "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC"
+    },
+    "endTime": {
+        "name": "End Time",
+        "type": "long integer",
+        "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC"
+    },
+    "lowPassFilter": {
+        "name": "Apply Low Pass Filter",
+        "type": "boolean",
+        "description": "Specifies whether to apply a low pass filter on the analytics results"
+    },
+    "seasonalFilter": {
+        "name": "Apply Seasonal Filter",
+        "type": "boolean",
+        "description": "Specified whether to apply a seasonal cycle filter on the analytics results"
+    }
+}
+
+
+class NexusInitializerWrapper:
+    def __init__(self, clazz):
+        self.__log = logging.getLogger(__name__)
+        self.__hasBeenRun = False
+        self.__clazz = clazz
+        self.validate()
+
+    def validate(self):
+        if "init" not in self.__clazz.__dict__ or not type(self.__clazz.__dict__["init"]) == types.FunctionType:
+            raise Exception("Method 'init' has not been declared")
+
+    def clazz(self):
+        return self.__clazz
+
+    def hasBeenRun(self):
+        return self.__hasBeenRun
+
+    def init(self, config):
+        if not self.__hasBeenRun:
+            self.__hasBeenRun = True
+            instance = self.__clazz()
+            instance.init(config)
+        else:
+            self.log("Initializer '%s' has already been run" % self.__clazz)
+
+
+class AlgorithmModuleWrapper:
+    def __init__(self, clazz):
+        self.__instance = None
+        self.__clazz = clazz
+        self.validate()
+
+    def validate(self):
+        if "calc" not in self.__clazz.__dict__ or not type(self.__clazz.__dict__["calc"]) == types.FunctionType:
+            raise Exception("Method 'calc' has not been declared")
+
+        if "path" not in self.__clazz.__dict__:
+            raise Exception("Property 'path' has not been defined")
+
+        if "name" not in self.__clazz.__dict__:
+            raise Exception("Property 'name' has not been defined")
+
+        if "description" not in self.__clazz.__dict__:
+            raise Exception("Property 'description' has not been defined")
+
+        if "params" not in self.__clazz.__dict__:
+            raise Exception("Property 'params' has not been defined")
+
+    def clazz(self):
+        return self.__clazz
+
+    def name(self):
+        return self.__clazz.name
+
+    def path(self):
+        return self.__clazz.path
+
+    def description(self):
+        return self.__clazz.description
+
+    def params(self):
+        return self.__clazz.params
+
+    def instance(self, algorithm_config=None, sc=None):
+        if "singleton" in self.__clazz.__dict__ and self.__clazz.__dict__["singleton"] is True:
+            if self.__instance is None:
+                self.__instance = self.__clazz()
+
+                try:
+                    self.__instance.set_config(algorithm_config)
+                except AttributeError:
+                    pass
+
+                try:
+                    self.__instance.set_spark_context(sc)
+                except AttributeError:
+                    pass
+
+            return self.__instance
+        else:
+            instance = self.__clazz()
+
+            try:
+                instance.set_config(algorithm_config)
+            except AttributeError:
+                pass
+
+            try:
+                self.__instance.set_spark_context(sc)
+            except AttributeError:
+                pass
+            return instance
+
+    def isValid(self):
+        try:
+            self.validate()
+            return True
+        except Exception as ex:
+            return False
+
+
+class CalcHandler(object):
+    def calc(self, computeOptions, **args):
+        raise Exception("calc() not yet implemented")
+
+
+class NexusHandler(CalcHandler):
+    def __init__(self, skipCassandra=False, skipSolr=False):
+        CalcHandler.__init__(self)
+
+        self.algorithm_config = None
+        self._tile_service = NexusTileService(skipCassandra, skipSolr)
+
+    def set_config(self, algorithm_config):
+        self.algorithm_config = algorithm_config
+
+    def _mergeDicts(self, x, y):
+        z = x.copy()
+        z.update(y)
+        return z
+
+    def _now(self):
+        millis = int(round(time.time() * 1000))
+        return millis
+
+    def _mergeDataSeries(self, resultsData, dataNum, resultsMap):
+
+        for entry in resultsData:
+
+            #frmtdTime = datetime.fromtimestamp(entry["time"] ).strftime("%Y-%m")
+            frmtdTime = entry["time"]
+
+            if not frmtdTime in resultsMap:
+                resultsMap[frmtdTime] = []
+            entry["ds"] = dataNum
+            resultsMap[frmtdTime].append(entry)
+
+    def _resultsMapToList(self, resultsMap):
+        resultsList = []
+        for key, value in resultsMap.iteritems():
+            resultsList.append(value)
+
+        resultsList = sorted(resultsList, key=lambda entry: entry[0]["time"])
+        return resultsList
+
+    def _mergeResults(self, resultsRaw):
+        resultsMap = {}
+
+        for i in range(0, len(resultsRaw)):
+            resultsSeries = resultsRaw[i]
+            resultsData = resultsSeries[0]
+            self._mergeDataSeries(resultsData, i, resultsMap)
+
+        resultsList = self._resultsMapToList(resultsMap)
+        return resultsList
+
+
+class SparkHandler(NexusHandler):
+    class SparkJobContext(object):
+
+        class MaxConcurrentJobsReached(Exception):
+            def __init__(self, *args, **kwargs):
+                Exception.__init__(self, *args, **kwargs)
+
+        def __init__(self, job_stack):
+            self.spark_job_stack = job_stack
+            self.job_name = None
+            self.log = logging.getLogger(__name__)
+
+        def __enter__(self):
+            try:
+                self.job_name = self.spark_job_stack.pop()
+                self.log.debug("Using %s" % self.job_name)
+            except IndexError:
+                raise SparkHandler.SparkJobContext.MaxConcurrentJobsReached()
+            return self
+
+        def __exit__(self, exc_type, exc_val, exc_tb):
+            if self.job_name is not None:
+                self.log.debug("Returning %s" % self.job_name)
+                self.spark_job_stack.append(self.job_name)
+
+    def __init__(self, **kwargs):
+        import inspect
+        NexusHandler.__init__(self, **kwargs)
+        self._sc = None
+
+        self.spark_job_stack = []
+
+        def with_spark_job_context(calc_func):
+            from functools import wraps
+
+            @wraps(calc_func)
+            def wrapped(*args, **kwargs1):
+                try:
+                    with SparkHandler.SparkJobContext(self.spark_job_stack) as job_context:
+                        # TODO Pool and Job are forced to a 1-to-1 relationship
+                        calc_func.im_self._sc.setLocalProperty("spark.scheduler.pool", job_context.job_name)
+                        calc_func.im_self._sc.setJobGroup(job_context.job_name, "a spark job")
+                        return calc_func(*args, **kwargs1)
+                except SparkHandler.SparkJobContext.MaxConcurrentJobsReached:
+                    raise NexusProcessingException(code=503,
+                                                   reason="Max concurrent requests reached. Please try again later.")
+
+            return wrapped
+
+        for member in inspect.getmembers(self, predicate=inspect.ismethod):
+            if member[0] == "calc":
+                setattr(self, member[0], with_spark_job_context(member[1]))
+
+    def set_spark_context(self, sc):
+        self._sc = sc
+
+    def set_config(self, algorithm_config):
+        max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section(
+            "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10
+        self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)])
+        self.algorithm_config = algorithm_config
+
+    def _setQueryParams(self, ds, bounds, start_time=None, end_time=None,
+                        start_year=None, end_year=None, clim_month=None,
+                        fill=-9999., spark_master=None, spark_nexecs=None,
+                        spark_nparts=None):
+        self._ds = ds
+        self._minLat, self._maxLat, self._minLon, self._maxLon = bounds
+        self._startTime = start_time
+        self._endTime = end_time
+        self._startYear = start_year
+        self._endYear = end_year
+        self._climMonth = clim_month
+        self._fill = fill
+        self._spark_master = spark_master
+        self._spark_nexecs = spark_nexecs
+        self._spark_nparts = spark_nparts
+
+    def _find_global_tile_set(self):
+        if type(self._ds) in (list,tuple):
+            ds = self._ds[0]
+        else:
+            ds = self._ds
+        ntiles = 0
+        ##################################################################
+        # Temporary workaround until we have dataset metadata to indicate
+        # temporal resolution.
+        if "monthly" in ds.lower():
+            t_incr = 2592000 # 30 days
+        else:
+            t_incr = 86400 # 1 day
+        ##################################################################
+        t = self._endTime
+        self._latRes = None
+        self._lonRes = None
+        while ntiles == 0:
+            nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=ds, start_time=t-t_incr, end_time=t)
+            ntiles = len(nexus_tiles)
+            self.log.debug('find_global_tile_set got {0} tiles'.format(ntiles))
+            if ntiles > 0:
+                for tile in nexus_tiles:
+                    self.log.debug('tile coords:')
+                    self.log.debug('tile lats: {0}'.format(tile.latitudes))
+                    self.log.debug('tile lons: {0}'.format(tile.longitudes))
+                    if self._latRes is None:
+                        lats = tile.latitudes.data
+                        if (len(lats) > 1):
+                            self._latRes = abs(lats[1]-lats[0])
+                    if self._lonRes is None:
+                        lons = tile.longitudes.data
+                        if (len(lons) > 1):
+                            self._lonRes = abs(lons[1]-lons[0])
+                    if ((self._latRes is not None) and 
+                        (self._lonRes is not None)):
+                        break
+                if (self._latRes is None) or (self._lonRes is None):
+                    ntiles = 0
+                else:
+                    lats_agg = np.concatenate([tile.latitudes.compressed()
+                                               for tile in nexus_tiles])
+                    lons_agg = np.concatenate([tile.longitudes.compressed()
+                                               for tile in nexus_tiles])
+                    self._minLatCent = np.min(lats_agg)
+                    self._maxLatCent = np.max(lats_agg)
+                    self._minLonCent = np.min(lons_agg)
+                    self._maxLonCent = np.max(lons_agg)
+            t -= t_incr
+        return nexus_tiles
+
+    def _find_tile_bounds(self, t):
+        lats = t.latitudes
+        lons = t.longitudes
+        if (len(lats.compressed()) > 0) and (len(lons.compressed()) > 0):
+            min_lat = np.ma.min(lats)
+            max_lat = np.ma.max(lats)
+            min_lon = np.ma.min(lons)
+            max_lon = np.ma.max(lons)
+            good_inds_lat = np.where(lats.mask == False)[0]
+            good_inds_lon = np.where(lons.mask == False)[0]
+            min_y = np.min(good_inds_lat)
+            max_y = np.max(good_inds_lat)
+            min_x = np.min(good_inds_lon)
+            max_x = np.max(good_inds_lon)
+            bounds = (min_lat, max_lat, min_lon, max_lon,
+                      min_y, max_y, min_x, max_x)
+        else:
+            self.log.warn('Nothing in this tile!')
+            bounds = None
+        return bounds
+        
+    @staticmethod
+    def query_by_parts(tile_service, min_lat, max_lat, min_lon, max_lon, 
+                       dataset, start_time, end_time, part_dim=0):
+        nexus_max_tiles_per_query = 100
+        #print 'trying query: ',min_lat, max_lat, min_lon, max_lon, \
+        #    dataset, start_time, end_time
+        try:
+            tiles = \
+                tile_service.find_tiles_in_box(min_lat, max_lat, 
+                                               min_lon, max_lon, 
+                                               dataset, 
+                                               start_time=start_time, 
+                                               end_time=end_time,
+                                               fetch_data=False)
+            assert(len(tiles) <= nexus_max_tiles_per_query)
+        except:
+            #print 'failed query: ',min_lat, max_lat, min_lon, max_lon, \
+            #    dataset, start_time, end_time
+            if part_dim == 0: 
+                # Partition by latitude.
+                mid_lat = (min_lat + max_lat) / 2
+                nexus_tiles = SparkHandler.query_by_parts(tile_service, 
+                                                          min_lat, mid_lat, 
+                                                          min_lon, max_lon, 
+                                                          dataset, 
+                                                          start_time, end_time,
+                                                          part_dim=part_dim)
+                nexus_tiles.extend(SparkHandler.query_by_parts(tile_service, 
+                                                               mid_lat, 
+                                                               max_lat, 
+                                                               min_lon, 
+                                                               max_lon, 
+                                                               dataset, 
+                                                               start_time, 
+                                                               end_time,
+                                                               part_dim=part_dim))
+            elif part_dim == 1: 
+                # Partition by longitude.
+                mid_lon = (min_lon + max_lon) / 2
+                nexus_tiles = SparkHandler.query_by_parts(tile_service, 
+                                                          min_lat, max_lat, 
+                                                          min_lon, mid_lon, 
+                                                          dataset, 
+                                                          start_time, end_time,
+                                                          part_dim=part_dim)
+                nexus_tiles.extend(SparkHandler.query_by_parts(tile_service, 
+                                                               min_lat, 
+                                                               max_lat, 
+                                                               mid_lon, 
+                                                               max_lon, 
+                                                               dataset, 
+                                                               start_time, 
+                                                               end_time,
+                                                               part_dim=part_dim))
+            elif part_dim == 2:
+                # Partition by time.
+                mid_time = (start_time + end_time) / 2
+                nexus_tiles = SparkHandler.query_by_parts(tile_service, 
+                                                          min_lat, max_lat, 
+                                                          min_lon, max_lon, 
+                                                          dataset, 
+                                                          start_time, mid_time,
+                                                          part_dim=part_dim)
+                nexus_tiles.extend(SparkHandler.query_by_parts(tile_service, 
+                                                               min_lat, 
+                                                               max_lat, 
+                                                               min_lon, 
+                                                               max_lon, 
+                                                               dataset, 
+                                                               mid_time, 
+                                                               end_time,
+                                                               part_dim=part_dim))
+        else:
+            # No exception, so query Cassandra for the tile data.
+            #print 'Making NEXUS query to Cassandra for %d tiles...' % \
+            #    len(tiles)
+            #t1 = time.time()
+            #print 'NEXUS call start at time %f' % t1
+            #sys.stdout.flush()
+            nexus_tiles = list(tile_service.fetch_data_for_tiles(*tiles))
+            nexus_tiles = list(tile_service.mask_tiles_to_bbox(min_lat, max_lat,
+                                                               min_lon, max_lon,
+                                                               nexus_tiles))
+            #t2 = time.time()
+            #print 'NEXUS call end at time %f' % t2
+            #print 'Seconds in NEXUS call: ', t2-t1
+            #sys.stdout.flush()
+
+        #print 'Returning %d tiles' % len(nexus_tiles)
+        return nexus_tiles
+
+    @staticmethod
+    def _prune_tiles(nexus_tiles):
+        del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0]
+        for i in np.flipud(del_ind):
+            del nexus_tiles[i]
+
+    def _lat2ind(self,lat):
+        return int((lat-self._minLatCent)/self._latRes)
+
+    def _lon2ind(self,lon):
+        return int((lon-self._minLonCent)/self._lonRes)
+
+    def _ind2lat(self,y):
+        return self._minLatCent+y*self._latRes
+
+    def _ind2lon(self,x):
+        return self._minLonCent+x*self._lonRes
+
+    def _create_nc_file_time1d(self, a, fname, varname, varunits=None,
+                               fill=None):
+        self.log.debug('a={0}'.format(a))
+        self.log.debug('shape a = {0}'.format(a.shape))
+        assert len(a.shape) == 1
+        time_dim = len(a)
+        rootgrp = Dataset(fname, "w", format="NETCDF4")
+        rootgrp.createDimension("time", time_dim)
+        vals = rootgrp.createVariable(varname, "f4", dimensions=("time",),
+                                      fill_value=fill)
+        times = rootgrp.createVariable("time", "f4", dimensions=("time",))
+        vals[:] = [d['mean'] for d in a]
+        times[:] = [d['time'] for d in a]
+        if varunits is not None:
+            vals.units = varunits
+        times.units = 'seconds since 1970-01-01 00:00:00'
+        rootgrp.close()
+
+    def _create_nc_file_latlon2d(self, a, fname, varname, varunits=None,
+                                 fill=None):
+        self.log.debug('a={0}'.format(a))
+        self.log.debug('shape a = {0}'.format(a.shape))
+        assert len(a.shape) == 2
+        lat_dim, lon_dim = a.shape
+        rootgrp = Dataset(fname, "w", format="NETCDF4")
+        rootgrp.createDimension("lat", lat_dim)
+        rootgrp.createDimension("lon", lon_dim)
+        vals = rootgrp.createVariable(varname, "f4",
+                                      dimensions=("lat","lon",),
+                                      fill_value=fill)
+        lats = rootgrp.createVariable("lat", "f4", dimensions=("lat",))
+        lons = rootgrp.createVariable("lon", "f4", dimensions=("lon",))
+        vals[:,:] = a
+        lats[:] = np.linspace(self._minLatCent, 
+                              self._maxLatCent, lat_dim)
+        lons[:] = np.linspace(self._minLonCent,
+                              self._maxLonCent, lon_dim)
+        if varunits is not None:
+            vals.units = varunits
+        lats.units = "degrees north"
+        lons.units = "degrees east"
+        rootgrp.close()
+
+    def _create_nc_file(self, a, fname, varname, **kwargs):
+        self._create_nc_file_latlon2d(a, fname, varname, **kwargs)
+
+
+def executeInitializers(config):
+    [wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/__init__.py b/analysis/webservice/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/Capabilities.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py
new file mode 100644
index 0000000..0477e67
--- /dev/null
+++ b/analysis/webservice/algorithms/Capabilities.py
@@ -0,0 +1,43 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import json
+
+from webservice.NexusHandler import CalcHandler, nexus_handler, AVAILABLE_HANDLERS
+from webservice.webmodel import NexusResults
+
+
+@nexus_handler
+class CapabilitiesListHandlerImpl(CalcHandler):
+    name = "Capabilities"
+    path = "/capabilities"
+    description = "Lists the current capabilities of this Nexus system"
+    params = {}
+    singleton = True
+
+    def __init__(self):
+        CalcHandler.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        capabilities = []
+
+        for capability in AVAILABLE_HANDLERS:
+            capabilityDef = {
+                "name": capability.name(),
+                "path": capability.path(),
+                "description": capability.description(),
+                "parameters": capability.params()
+            }
+            capabilities.append(capabilityDef)
+
+        return CapabilitiesResults(capabilities)
+
+
+class CapabilitiesResults(NexusResults):
+    def __init__(self, capabilities):
+        NexusResults.__init__(self)
+        self.__capabilities = capabilities
+
+    def toJson(self):
+        return json.dumps(self.__capabilities, indent=4)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/CorrelationMap.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py
new file mode 100644
index 0000000..5064a7f
--- /dev/null
+++ b/analysis/webservice/algorithms/CorrelationMap.py
@@ -0,0 +1,129 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import json
+import math
+import numpy as np
+from shapely.geometry import box
+from scipy.stats import linregress
+from itertools import groupby
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults
+from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
+
+
+@nexus_handler
+class LongitudeLatitudeMapHandlerImpl(NexusHandler):
+    name = "Correlation Map"
+    path = "/correlationMap"
+    description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    params["res"] = {
+        "name": "Resolution",
+        "type": "float",
+        "description": "The resolution of the resulting correlation map"
+    }
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        minLat = computeOptions.get_min_lat()
+        maxLat = computeOptions.get_max_lat()
+        minLon = computeOptions.get_min_lon()
+        maxLon = computeOptions.get_max_lon()
+        ds = computeOptions.get_dataset()
+        startTime = computeOptions.get_start_time()
+        endTime = computeOptions.get_end_time()
+        resolution = computeOptions.get_decimal_arg("res", default=1.0)
+
+        if not len(ds) == 2:
+            raise Exception("Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2")
+
+        ds1tiles = self._tile_service.find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[0], startTime,
+                                                            endTime)
+        ds2tiles = self._tile_service.find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[1], startTime,
+                                                            endTime)
+
+        matches = self._match_tiles(ds1tiles, ds2tiles)
+
+        if len(matches) == 0:
+            raise NexusProcessingException(reason="Could not find any data temporally co-located")
+
+        results = [[{
+                        'cnt': 0,
+                        'slope': 0,
+                        'intercept': 0,
+                        'r': 0,
+                        'p': 0,
+                        'stderr': 0,
+                        'lat': float(lat),
+                        'lon': float(lon)
+                    } for lon in np.arange(minLon, maxLon, resolution)] for lat in
+                   np.arange(minLat, maxLat, resolution)]
+
+        for stats in results:
+            for stat in stats:
+                values_x = []
+                values_y = []
+                for tile_matches in matches:
+
+                    tile_1_list = tile_matches[0]
+                    value_1 = get_approximate_value_for_lat_lon(tile_1_list, stat["lat"], stat["lon"])
+
+                    tile_2_list = tile_matches[1]
+                    value_2 = get_approximate_value_for_lat_lon(tile_2_list, stat["lat"], stat["lon"])
+
+                    if not (math.isnan(value_1) or math.isnan(value_2)):
+                        values_x.append(value_1)
+                        values_y.append(value_2)
+
+                if len(values_x) > 2 and len(values_y) > 2:
+                    stats = linregress(values_x, values_y)
+
+                    stat["slope"] = stats[0] if not math.isnan(stats[0]) and not math.isinf(stats[0]) else str(stats[0])
+                    stat["intercept"] = stats[1] if not math.isnan(stats[1]) and not math.isinf(stats[1]) else str(
+                        stats[1])
+                    stat["r"] = stats[2] if not math.isnan(stats[2]) and not math.isinf(stats[2]) else str(stats[2])
+                    stat["p"] = stats[3] if not math.isnan(stats[3]) and not math.isinf(stats[3]) else str(stats[3])
+                    stat["stderr"] = stats[4] if not math.isnan(stats[4]) and not math.isinf(stats[4]) else str(
+                        stats[4])
+                    stat["cnt"] = len(values_x)
+
+        return CorrelationResults(results)
+
+    def _match_tiles(self, tiles_1, tiles_2):
+
+        date_map = {}
+
+        tiles_1 = sorted(tiles_1, key=lambda tile: tile.min_time)
+        for tile_start_time, tiles in groupby(tiles_1, lambda tile: tile.min_time):
+            if not tile_start_time in date_map:
+                date_map[tile_start_time] = []
+            date_map[tile_start_time].append(list(tiles))
+
+        tiles_2 = sorted(tiles_2, key=lambda tile: tile.min_time)
+        for tile_start_time, tiles in groupby(tiles_2, lambda tile: tile.min_time):
+            if not tile_start_time in date_map:
+                date_map[tile_start_time] = []
+            date_map[tile_start_time].append(list(tiles))
+
+        matches = [tiles for a_time, tiles in date_map.iteritems() if len(tiles) == 2]
+
+        return matches
+
+
+class CorrelationResults(NexusResults):
+    def __init__(self, results):
+        NexusResults.__init__(self)
+        self.results = results
+
+    def toJson(self):
+        json_d = {
+            "stats": {},
+            "meta": [None, None],
+            "data": self.results
+        }
+        return json.dumps(json_d, indent=4)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DailyDifferenceAverage.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py
new file mode 100644
index 0000000..1518c9e
--- /dev/null
+++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py
@@ -0,0 +1,230 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import sys
+import traceback
+from datetime import datetime, timedelta
+from multiprocessing.dummy import Pool, Manager
+from shapely.geometry import box
+
+import numpy as np
+import pytz
+from nexustiles.nexustiles import NexusTileService, NexusTileServiceException
+
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+SENTINEL = 'STOP'
+
+
+@nexus_handler
+class DailyDifferenceAverageImpl(NexusHandler):
+    name = "Daily Difference Average"
+    path = "/dailydifferenceaverage"
+    description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day."
+    params = {
+        "ds1": {
+            "name": "Dataset 1",
+            "type": "string",
+            "description": "The first Dataset shortname to use in calculation"
+        },
+        "ds2": {
+            "name": "Dataset 2",
+            "type": "string",
+            "description": "The second Dataset shortname to use in calculation"
+        },
+        "minLon": {
+            "name": "Minimum Longitude",
+            "type": "float",
+            "description": "Minimum (Western) bounding box Longitude"
+        },
+        "minLat": {
+            "name": "Minimum Latitude",
+            "type": "float",
+            "description": "Minimum (Southern) bounding box Latitude"
+        },
+        "maxLon": {
+            "name": "Maximum Longitude",
+            "type": "float",
+            "description": "Maximum (Eastern) bounding box Longitude"
+        },
+        "maxLat": {
+            "name": "Maximum Latitude",
+            "type": "float",
+            "description": "Maximum (Northern) bounding box Latitude"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "long integer",
+            "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "long integer",
+            "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=True)
+
+    def calc(self, request, **args):
+        min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon()
+        dataset1 = request.get_argument("ds1", None)
+        dataset2 = request.get_argument("ds2", None)
+        start_time = request.get_start_time()
+        end_time = request.get_end_time()
+
+        simple = request.get_argument("simple", None) is not None
+
+        averagebyday = self.get_daily_difference_average_for_box(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
+                                                                 start_time, end_time)
+
+        averagebyday = sorted(averagebyday, key=lambda dayavg: dayavg[0])
+
+        if simple:
+
+            import matplotlib.pyplot as plt
+            from matplotlib.dates import date2num
+
+            times = [date2num(self.date_from_ms(dayavg[0])) for dayavg in averagebyday]
+            means = [dayavg[1] for dayavg in averagebyday]
+            plt.plot_date(times, means, ls='solid')
+
+            plt.xlabel('Date')
+            plt.xticks(rotation=70)
+            plt.ylabel(u'Difference from 5-Day mean (\u00B0C)')
+            plt.title('Sea Surface Temperature (SST) Anomalies')
+            plt.grid(True)
+            plt.tight_layout()
+            plt.savefig("test.png")
+
+            return averagebyday, None, None
+        else:
+
+            result = NexusResults(
+                results=[[{'time': dayms, 'mean': avg, 'ds': 0}] for dayms, avg in averagebyday],
+                stats={},
+                meta=self.get_meta())
+
+            result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+            result.meta()['label'] = u'Difference from 5-Day mean (\u00B0C)'
+
+            return result
+
+    def date_from_ms(self, dayms):
+        base_datetime = datetime(1970, 1, 1)
+        delta = timedelta(0, 0, 0, dayms)
+        return base_datetime + delta
+
+    def get_meta(self):
+        meta = {
+            "title": "Sea Surface Temperature (SST) Anomalies",
+            "description": "SST anomolies are departures from the 5-day pixel mean",
+            "units": u'\u00B0C',
+        }
+        return meta
+
+    def get_daily_difference_average_for_box(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
+                                             start_time,
+                                             end_time):
+
+        daysinrange = self._tile_service.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset1,
+                                                                start_time, end_time)
+
+        maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+        if maxprocesses == 1:
+            calculator = DailyDifferenceAverageCalculator()
+            averagebyday = []
+            for dayinseconds in daysinrange:
+                result = calculator.calc_average_diff_on_day(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
+                                                             dayinseconds)
+                averagebyday.append((result[0], result[1]))
+        else:
+            # Create a task to calc average difference for each day
+            manager = Manager()
+            work_queue = manager.Queue()
+            done_queue = manager.Queue()
+            for dayinseconds in daysinrange:
+                work_queue.put(
+                    ('calc_average_diff_on_day', min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, dayinseconds))
+            [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+            # Start new processes to handle the work
+            pool = Pool(maxprocesses)
+            [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+            pool.close()
+
+            # Collect the results as [(day (in ms), average difference for that day)]
+            averagebyday = []
+            for i in xrange(0, len(daysinrange)):
+                result = done_queue.get()
+                if result[0] == 'error':
+                    print >> sys.stderr, result[1]
+                    raise NexusProcessingException(reason="Error calculating average by day.")
+                rdata = result
+                averagebyday.append((rdata[0], rdata[1]))
+
+            pool.terminate()
+            manager.shutdown()
+
+        return averagebyday
+
+
+class DailyDifferenceAverageCalculator(object):
+    def __init__(self):
+        self.__tile_service = NexusTileService()
+
+    def calc_average_diff_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, timeinseconds):
+
+        day_of_year = datetime.fromtimestamp(timeinseconds, pytz.utc).timetuple().tm_yday
+
+        ds1_nexus_tiles = self.__tile_service.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon,
+                                                                            dataset1,
+                                                                            timeinseconds)
+
+        # Initialize list of differences
+        differences = []
+        # For each ds1tile
+        for ds1_tile in ds1_nexus_tiles:
+            # Get tile for ds2 using bbox from ds1_tile and day ms
+            try:
+                ds2_tile = self.__tile_service.find_tile_by_polygon_and_most_recent_day_of_year(
+                    box(ds1_tile.bbox.min_lon, ds1_tile.bbox.min_lat, ds1_tile.bbox.max_lon, ds1_tile.bbox.max_lat),
+                    dataset2, day_of_year)[0]
+                # Subtract ds2 tile from ds1 tile
+                diff = np.subtract(ds1_tile.data, ds2_tile.data)
+            except NexusTileServiceException:
+                # This happens when there is data in ds1tile but all NaNs in ds2tile because the
+                # Solr query being used filters out results where stats_count = 0.
+                # Technically, this should never happen if ds2 is a climatology generated in part from ds1
+                # and it is probably a data error
+
+                # For now, just treat ds2 as an array of all masked data (which essentially discards the ds1 data)
+                ds2_tile = np.ma.masked_all(ds1_tile.data.shape)
+                diff = np.subtract(ds1_tile.data, ds2_tile)
+
+            # Put results in list of differences
+            differences.append(np.ma.array(diff).ravel())
+
+        # Average List of differences
+        diffaverage = np.ma.mean(differences).item()
+        # Return Average by day
+        return int(timeinseconds), diffaverage
+
+
+def pool_worker(work_queue, done_queue):
+    try:
+        calculator = DailyDifferenceAverageCalculator()
+
+        for work in iter(work_queue.get, SENTINEL):
+            scifunction = work[0]
+            args = work[1:]
+            result = calculator.__getattribute__(scifunction)(*args)
+            done_queue.put(result)
+    except Exception as e:
+        e_str = traceback.format_exc(e)
+        done_queue.put(('error', e_str))

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DataInBoundsSearch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py
new file mode 100644
index 0000000..f9aa609
--- /dev/null
+++ b/analysis/webservice/algorithms/DataInBoundsSearch.py
@@ -0,0 +1,205 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import logging
+from datetime import datetime
+from pytz import timezone
+
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class DataInBoundsSearchHandlerImpl(NexusHandler):
+    name = "Data In-Bounds Search"
+    path = "/datainbounds"
+    description = "Fetches point values for a given dataset and geographical area"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "string",
+            "description": "The Dataset shortname to use in calculation. Required"
+        },
+        "parameter": {
+            "name": "Parameter",
+            "type": "string",
+            "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        try:
+            ds = request.get_dataset()[0]
+        except:
+            raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+        parameter_s = request.get_argument('parameter', None)
+        if parameter_s not in ['sst', 'sss', 'wind', None]:
+            raise NexusProcessingException(
+                reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+            start_time = long((start_time - EPOCH).total_seconds())
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+            end_time = long((end_time - EPOCH).total_seconds())
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            raise NexusProcessingException(
+                reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                code=400)
+
+        return ds, parameter_s, start_time, end_time, bounding_polygon
+
+    def calc(self, computeOptions, **args):
+        ds, parameter, start_time, end_time, bounding_polygon = self.parse_arguments(computeOptions)
+
+        includemeta = computeOptions.get_include_meta()
+
+        min_lat = bounding_polygon.bounds[1]
+        max_lat = bounding_polygon.bounds[3]
+        min_lon = bounding_polygon.bounds[0]
+        max_lon = bounding_polygon.bounds[2]
+
+        tiles = self._tile_service.get_tiles_bounded_by_box(min_lat, max_lat, min_lon, max_lon, ds, start_time,
+                                                            end_time)
+
+        data = []
+        for tile in tiles:
+            for nexus_point in tile.nexus_point_generator():
+
+                point = dict()
+                point['id'] = tile.tile_id
+
+                if parameter == 'sst':
+                    point['sst'] = nexus_point.data_val
+                elif parameter == 'sss':
+                    point['sss'] = nexus_point.data_val
+                elif parameter == 'wind':
+                    point['wind_u'] = nexus_point.data_val
+                    try:
+                        point['wind_v'] = tile.meta_data['wind_v'][tuple(nexus_point.index)]
+                    except (KeyError, IndexError):
+                        pass
+                    try:
+                        point['wind_direction'] = tile.meta_data['wind_dir'][tuple(nexus_point.index)]
+                    except (KeyError, IndexError):
+                        pass
+                    try:
+                        point['wind_speed'] = tile.meta_data['wind_speed'][tuple(nexus_point.index)]
+                    except (KeyError, IndexError):
+                        pass
+                else:
+                    pass
+
+                data.append({
+                    'latitude': nexus_point.latitude,
+                    'longitude': nexus_point.longitude,
+                    'time': nexus_point.time,
+                    'data': [
+                        point
+                    ]
+                })
+
+        if includemeta and len(tiles) > 0:
+            meta = [tile.get_summary() for tile in tiles]
+        else:
+            meta = None
+
+        result = DataInBoundsResult(
+            results=data,
+            stats={},
+            meta=meta)
+
+        result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+
+        return result
+
+
+class DataInBoundsResult(NexusResults):
+    def toCSV(self):
+        rows = []
+
+        headers = [
+            "id",
+            "lon",
+            "lat",
+            "time"
+        ]
+
+        for i, result in enumerate(self.results()):
+            cols = []
+
+            cols.append(str(result['data'][0]['id']))
+            cols.append(str(result['longitude']))
+            cols.append(str(result['latitude']))
+            cols.append(datetime.utcfromtimestamp(result["time"]).strftime('%Y-%m-%dT%H:%M:%SZ'))
+            if 'sst' in result['data'][0]:
+                cols.append(str(result['data'][0]['sst']))
+                if i == 0:
+                    headers.append("sea_water_temperature")
+            elif 'sss' in result['data'][0]:
+                cols.append(str(result['data'][0]['sss']))
+                if i == 0:
+                    headers.append("sea_water_salinity")
+            elif 'wind_u' in result['data'][0]:
+                cols.append(str(result['data'][0]['wind_u']))
+                cols.append(str(result['data'][0]['wind_v']))
+                cols.append(str(result['data'][0]['wind_direction']))
+                cols.append(str(result['data'][0]['wind_speed']))
+                if i == 0:
+                    headers.append("eastward_wind")
+                    headers.append("northward_wind")
+                    headers.append("wind_direction")
+                    headers.append("wind_speed")
+
+            if i == 0:
+                rows.append(",".join(headers))
+            rows.append(",".join(cols))
+
+        return "\r\n".join(rows)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DataSeriesList.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py
new file mode 100644
index 0000000..399742a
--- /dev/null
+++ b/analysis/webservice/algorithms/DataSeriesList.py
@@ -0,0 +1,30 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import json
+from webservice.NexusHandler import NexusHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import cached
+
+
+@nexus_handler
+class DataSeriesListHandlerImpl(NexusHandler):
+    name = "Dataset List"
+    path = "/list"
+    description = "Lists datasets currently available for analysis"
+    params = {}
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=True)
+
+    @cached(ttl=(60 * 60 * 1000))  # 1 hour cached
+    def calc(self, computeOptions, **args):
+        class SimpleResult(object):
+            def __init__(self, result):
+                self.result = result
+
+            def toJson(self):
+                return json.dumps(self.result)
+
+        return SimpleResult(self._tile_service.get_dataseries_list())

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DelayTest.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py
new file mode 100644
index 0000000..85bdb8c
--- /dev/null
+++ b/analysis/webservice/algorithms/DelayTest.py
@@ -0,0 +1,29 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import time
+
+from webservice.NexusHandler import CalcHandler
+from webservice.NexusHandler import nexus_handler
+
+
+@nexus_handler
+class DelayHandlerImpl(CalcHandler):
+    name = "Delay"
+    path = "/delay"
+    description = "Waits a little while"
+    params = {}
+    singleton = True
+
+    def __init__(self):
+        CalcHandler.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        time.sleep(10)
+
+        class SimpleResult(object):
+            def toJson(self):
+                return ""
+
+        return SimpleResult()

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/ErrorTosserTest.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py
new file mode 100644
index 0000000..3f791d5
--- /dev/null
+++ b/analysis/webservice/algorithms/ErrorTosserTest.py
@@ -0,0 +1,23 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+from webservice.NexusHandler import CalcHandler, nexus_handler
+
+
+@nexus_handler
+class ErrorTosserHandler(CalcHandler):
+    name = "MakeError"
+    path = "/makeerror"
+    description = "Causes an error"
+    params = {}
+    singleton = True
+
+    def __init__(self):
+        CalcHandler.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        a = 100 / 0.0
+        # raise Exception("I'm Mad!")
+        # raise NexusProcessingException.NexusProcessingException(NexusProcessingException.StandardNexusErrors.UNKNOWN, "I'm Mad!")
+        return {}, None, None

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/Heartbeat.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py
new file mode 100644
index 0000000..a462739
--- /dev/null
+++ b/analysis/webservice/algorithms/Heartbeat.py
@@ -0,0 +1,40 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import json
+
+from webservice.NexusHandler import NexusHandler, nexus_handler, AVAILABLE_HANDLERS
+from webservice.webmodel import NexusResults
+
+
+@nexus_handler
+class HeartbeatHandlerImpl(NexusHandler):
+    name = "Backend Services Status"
+    path = "/heartbeat"
+    description = "Returns health status of Nexus backend services"
+    params = {}
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=True)
+
+    def calc(self, computeOptions, **args):
+        solrOnline = self._tile_service.pingSolr()
+
+        # Not sure how to best check cassandra cluster status so just return True for now
+        cassOnline = True
+
+        if solrOnline and cassOnline:
+            status = {"online": True}
+        else:
+            status = {"online": False}
+
+        class SimpleResult(object):
+            def __init__(self, result):
+                self.result = result
+
+            def toJson(self):
+                return json.dumps(self.result)
+
+        return SimpleResult(status)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/HofMoeller.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py
new file mode 100644
index 0000000..98cff8d
--- /dev/null
+++ b/analysis/webservice/algorithms/HofMoeller.py
@@ -0,0 +1,362 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import itertools
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.pyplot as plt
+import mpld3
+import numpy as np
+from matplotlib import cm
+from matplotlib.ticker import FuncFormatter
+
+from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults
+
+SENTINEL = 'STOP'
+LATITUDE = 0
+LONGITUDE = 1
+
+
+class LongitudeHofMoellerCalculator(object):
+    def longitude_time_hofmoeller_stats(self, tile, index):
+        stat = {
+            'sequence': index,
+            'time': np.ma.min(tile.times),
+            'lons': []
+        }
+
+        points = list(tile.nexus_point_generator())
+        data = sorted(points, key=lambda p: p.longitude)
+        points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
+
+        for lon, points_at_lon in points_by_lon:
+            values_at_lon = np.array([point.data_val for point in points_at_lon])
+            stat['lons'].append({
+                'longitude': float(lon),
+                'cnt': len(values_at_lon),
+                'avg': np.mean(values_at_lon).item(),
+                'max': np.max(values_at_lon).item(),
+                'min': np.min(values_at_lon).item(),
+                'std': np.std(values_at_lon).item()
+            })
+
+        return stat
+
+
+class LatitudeHofMoellerCalculator(object):
+    def latitude_time_hofmoeller_stats(self, tile, index):
+        stat = {
+            'sequence': index,
+            'time': np.ma.min(tile.times),
+            'lats': []
+        }
+
+        points = list(tile.nexus_point_generator())
+        data = sorted(points, key=lambda p: p.latitude)
+        points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
+
+        for lat, points_at_lat in points_by_lat:
+            values_at_lat = np.array([point.data_val for point in points_at_lat])
+
+            stat['lats'].append({
+                'latitude': float(lat),
+                'cnt': len(values_at_lat),
+                'avg': np.mean(values_at_lat).item(),
+                'max': np.max(values_at_lat).item(),
+                'min': np.min(values_at_lat).item(),
+                'std': np.std(values_at_lat).item()
+            })
+
+        return stat
+
+
+class BaseHoffMoellerHandlerImpl(NexusHandler):
+    def __init__(self):
+        NexusHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
+        shape = (len(results), len(results[0][pivot]))
+        if shape[0] <= 12:
+            return results
+        for a in range(0, 12):
+            values = []
+            for b in range(a, len(results), 12):
+                values.append(np.average([l[field] for l in results[b][pivot]]))
+            avg = np.average(values)
+
+            for b in range(a, len(results), 12):
+                for l in results[b][pivot]:
+                    l["%sSeasonal" % field] = l[field] - avg
+
+        return results
+
+    def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True):
+        results = self.applyDeseasonToHofMoellerByField(results, pivot, field="avg", append=append)
+        results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append)
+        results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append)
+        return results
+
+
+@nexus_handler
+class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerHandlerImpl):
+    name = "Latitude/Time HofMoeller"
+    path = "/latitudeTimeHofMoeller"
+    description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        BaseHoffMoellerHandlerImpl.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        tiles = self._tile_service.get_tiles_bounded_by_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+                                                            computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+                                                            computeOptions.get_dataset()[0],
+                                                            computeOptions.get_start_time(),
+                                                            computeOptions.get_end_time())
+
+        if len(tiles) == 0:
+            raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+        maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+        results = []
+        if maxprocesses == 1:
+            calculator = LatitudeHofMoellerCalculator()
+            for x, tile in enumerate(tiles):
+                result = calculator.latitude_time_hofmoeller_stats(tile, x)
+                results.append(result)
+        else:
+            manager = Manager()
+            work_queue = manager.Queue()
+            done_queue = manager.Queue()
+            for x, tile in enumerate(tiles):
+                work_queue.put(
+                    ('latitude_time_hofmoeller_stats', tile, x))
+            [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+            # Start new processes to handle the work
+            pool = Pool(maxprocesses)
+            [pool.apply_async(pool_worker, (LATITUDE, work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+            pool.close()
+
+            # Collect the results
+            for x, tile in enumerate(tiles):
+                result = done_queue.get()
+                try:
+                    error_str = result['error']
+                    self.log.error(error_str)
+                    raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.")
+                except KeyError:
+                    pass
+
+                results.append(result)
+
+            pool.terminate()
+            manager.shutdown()
+
+        results = sorted(results, key=lambda entry: entry["time"])
+
+        results = self.applyDeseasonToHofMoeller(results)
+
+        result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+        return result
+
+
+@nexus_handler
+class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerHandlerImpl):
+    name = "Longitude/Time HofMoeller"
+    path = "/longitudeTimeHofMoeller"
+    description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        BaseHoffMoellerHandlerImpl.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        tiles = self._tile_service.get_tiles_bounded_by_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+                                                            computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+                                                            computeOptions.get_dataset()[0],
+                                                            computeOptions.get_start_time(),
+                                                            computeOptions.get_end_time())
+
+        if len(tiles) == 0:
+            raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+        maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
+
+        results = []
+        if maxprocesses == 1:
+            calculator = LongitudeHofMoellerCalculator()
+            for x, tile in enumerate(tiles):
+                result = calculator.longitude_time_hofmoeller_stats(tile, x)
+                results.append(result)
+        else:
+            manager = Manager()
+            work_queue = manager.Queue()
+            done_queue = manager.Queue()
+            for x, tile in enumerate(tiles):
+                work_queue.put(
+                    ('longitude_time_hofmoeller_stats', tile, x))
+            [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)]
+
+            # Start new processes to handle the work
+            pool = Pool(maxprocesses)
+            [pool.apply_async(pool_worker, (LONGITUDE, work_queue, done_queue)) for _ in xrange(0, maxprocesses)]
+            pool.close()
+
+            # Collect the results
+            for x, tile in enumerate(tiles):
+                result = done_queue.get()
+                try:
+                    error_str = result['error']
+                    self.log.error(error_str)
+                    raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.")
+                except KeyError:
+                    pass
+
+                results.append(result)
+
+            pool.terminate()
+            manager.shutdown()
+
+        results = sorted(results, key=lambda entry: entry["time"])
+
+        results = self.applyDeseasonToHofMoeller(results, pivot="lons")
+
+        result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+        return result
+
+
+class HoffMoellerResults(NexusResults):
+    LATITUDE = 0
+    LONGITUDE = 1
+
+    def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args):
+        NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+        self.__type = args['type']
+
+    def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
+        cmap = cm.coolwarm
+        # ls = LightSource(315, 45)
+        # rgb = ls.shade(data, cmap)
+
+        fig, ax = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        cax = ax.imshow(data, interpolation=interpolate, cmap=cmap)
+
+        def yFormatter(y, pos):
+            if y < len(coordSeries):
+                return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.)
+            else:
+                return ""
+
+        def xFormatter(x, pos):
+            if x < len(timeSeries):
+                return timeSeries[int(x)].strftime('%b %Y')
+            else:
+                return ""
+
+        ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
+        ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
+
+        ax.set_title(title)
+        ax.set_ylabel(coordName)
+        ax.set_xlabel('Date')
+
+        fig.colorbar(cax)
+        fig.autofmt_xdate()
+
+        labels = ['point {0}'.format(i + 1) for i in range(len(data))]
+        # plugins.connect(fig, plugins.MousePosition(fontsize=14))
+        tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels)
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+    def createLongitudeHoffmueller(self, res, meta):
+        lonSeries = [m['longitude'] for m in res[0]['lons']]
+        timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+        data = np.zeros((len(lonSeries), len(timeSeries)))
+
+        plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "avg"
+
+        for t in range(0, len(timeSeries)):
+            timeSet = res[t]
+            for l in range(0, len(lonSeries)):
+                latSet = timeSet['lons'][l]
+                value = latSet[plotSeries]
+                data[len(lonSeries) - l - 1][t] = value
+
+        title = meta['title']
+        source = meta['source']
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude",
+                                      "%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+    def createLatitudeHoffmueller(self, res, meta):
+        latSeries = [m['latitude'] for m in res[0]['lats']]
+        timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+        data = np.zeros((len(latSeries), len(timeSeries)))
+
+        plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "avg"
+
+        for t in range(0, len(timeSeries)):
+            timeSet = res[t]
+            for l in range(0, len(latSeries)):
+                latSet = timeSet['lats'][l]
+                value = latSet[plotSeries]
+                data[len(latSeries) - l - 1][t] = value
+
+        title = meta['title']
+        source = meta['source']
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        return self.createHoffmueller(data, latSeries, timeSeries, "Latitude",
+                                      title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+    def toImage(self):
+        res = self.results()
+        meta = self.meta()
+
+        if self.__type == HoffMoellerResults.LATITUDE:
+            return self.createLatitudeHoffmueller(res, meta)
+        elif self.__type == HoffMoellerResults.LONGITUDE:
+            return self.createLongitudeHoffmueller(res, meta)
+        else:
+            raise Exception("Unsupported HoffMoeller Plot Type")
+
+
+def pool_worker(type, work_queue, done_queue):
+    try:
+
+        if type == LATITUDE:
+            calculator = LatitudeHofMoellerCalculator()
+        elif type == LONGITUDE:
+            calculator = LongitudeHofMoellerCalculator()
+
+        for work in iter(work_queue.get, SENTINEL):
+            scifunction = work[0]
+            args = work[1:]
+            result = calculator.__getattribute__(scifunction)(*args)
+            done_queue.put(result)
+
+    except Exception as e:
+        e_str = traceback.format_exc(e)
+        done_queue.put({'error': e_str})

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/LongitudeLatitudeMap.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
new file mode 100644
index 0000000..be9123e
--- /dev/null
+++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
@@ -0,0 +1,249 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import logging
+import math
+from datetime import datetime
+
+from pytz import timezone
+from shapely.geometry import box
+
+from webservice.NexusHandler import NexusHandler, nexus_handler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+SENTINEL = 'STOP'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+tile_service = None
+
+
+@nexus_handler
+class LongitudeLatitudeMapHandlerImpl(NexusHandler):
+    name = "Longitude/Latitude Time Average Map"
+    path = "/longitudeLatitudeMap"
+    description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "string",
+            "description": "One or more comma-separated dataset shortnames"
+        },
+        "minLat": {
+            "name": "Minimum Latitude",
+            "type": "float",
+            "description": "Minimum (Southern) bounding box Latitude"
+        },
+        "maxLat": {
+            "name": "Maximum Latitude",
+            "type": "float",
+            "description": "Maximum (Northern) bounding box Latitude"
+        },
+        "minLon": {
+            "name": "Minimum Longitude",
+            "type": "float",
+            "description": "Minimum (Western) bounding box Longitude"
+        },
+        "maxLon": {
+            "name": "Maximum Longitude",
+            "type": "float",
+            "description": "Maximum (Eastern) bounding box Longitude"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        NexusHandler.__init__(self, skipCassandra=True)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+        try:
+            ds = request.get_dataset()[0]
+        except:
+            raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+        try:
+            bounding_polygon = box(request.get_min_lon(), request.get_min_lat(), request.get_max_lon(),
+                                   request.get_max_lat())
+        except:
+            raise NexusProcessingException(
+                reason="'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required.",
+                code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch
+
+    def calc(self, request, **args):
+
+        ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch = self.parse_arguments(request)
+
+        boxes = self._tile_service.get_distinct_bounding_boxes_in_polygon(bounding_polygon, ds,
+                                                                          start_seconds_from_epoch,
+                                                                          end_seconds_from_epoch)
+        point_avg_over_time = lat_lon_map_driver(bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, ds,
+                                                 [a_box.bounds for a_box in boxes])
+
+        kwargs = {
+            "minLon": bounding_polygon.bounds[0],
+            "minLat": bounding_polygon.bounds[1],
+            "maxLon": bounding_polygon.bounds[2],
+            "maxLat": bounding_polygon.bounds[3],
+            "ds": ds,
+            "startTime": datetime.utcfromtimestamp(start_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ'),
+            "endTime": datetime.utcfromtimestamp(end_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ')
+        }
+        return LongitudeLatitudeMapResults(
+            results=LongitudeLatitudeMapHandlerImpl.results_to_dicts(point_avg_over_time), meta=None,
+            **kwargs)
+
+    @staticmethod
+    def results_to_dicts(results):
+
+        # ((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt))
+        return [{
+                    'lon': result[0][0],
+                    'lat': result[0][1],
+                    'slope': result[1][0] if not math.isnan(result[1][0]) else 'NaN',
+                    'intercept': result[1][1] if not math.isnan(result[1][1]) else 'NaN',
+                    'r': result[1][2],
+                    'p': result[1][3],
+                    'stderr': result[1][4] if not math.isinf(result[1][4]) else 'Inf',
+                    'avg': result[1][5],
+                    'max': result[1][6],
+                    'min': result[1][7],
+                    'std': result[1][8],
+                    'cnt': result[1][9],
+                } for result in results]
+
+
+def pool_initializer():
+    from nexustiles.nexustiles import NexusTileService
+    global tile_service
+    tile_service = NexusTileService()
+    # TODO This is a hack to make sure each sub-process uses it's own connection to cassandra. data-access needs to be updated
+    from cassandra.cqlengine import connection
+    from multiprocessing import current_process
+
+    connection.register_connection(current_process().name, [host.address for host in connection.get_session().hosts])
+    connection.set_default_connection(current_process().name)
+
+
+def lat_lon_map_driver(search_bounding_polygon, search_start, search_end, ds, distinct_boxes):
+    from functools import partial
+    from nexustiles.nexustiles import NexusTileService
+    # Start new processes to handle the work
+    # pool = Pool(5, pool_initializer)
+
+    func = partial(regression_on_tiles, search_bounding_polygon_wkt=search_bounding_polygon.wkt,
+                   search_start=search_start, search_end=search_end, ds=ds)
+
+    global tile_service
+    tile_service = NexusTileService()
+    map_result = map(func, distinct_boxes)
+    return [item for sublist in map_result for item in sublist]
+    # TODO Use for multiprocessing:
+    # result = pool.map_async(func, distinct_boxes)
+    #
+    # pool.close()
+    # while not result.ready():
+    #     print "Not ready"
+    #     time.sleep(5)
+    #
+    # print result.successful()
+    # print result.get()[0]
+    # pool.join()
+    # pool.terminate()
+    #
+    # return [item for sublist in result.get() for item in sublist]
+
+
+def calc_linear_regression(arry, xarry):
+    from scipy.stats import linregress
+    slope, intercept, r_value, p_value, std_err = linregress(arry, xarry)
+    return slope, intercept, r_value, p_value, std_err
+
+
+def regression_on_tiles(tile_bounds, search_bounding_polygon_wkt, search_start, search_end, ds):
+    if len(tile_bounds) < 1:
+        return []
+
+    import numpy as np
+    import operator
+    from shapely import wkt
+    from shapely.geometry import box
+    search_bounding_shape = wkt.loads(search_bounding_polygon_wkt)
+    tile_bounding_shape = box(*tile_bounds)
+
+    # Load all tiles for given (exact) bounding box across the search time range
+    tiles = tile_service.find_tiles_by_exact_bounds(tile_bounds, ds, search_start, search_end)
+    if search_bounding_shape.contains(tile_bounding_shape):
+        # The tile bounds are totally contained in the search area, we don't need to mask it.
+        pass
+    else:
+        # The tile bounds cross the search area borders, we need to mask the tiles to the search area
+        tiles = tile_service.mask_tiles_to_polygon(wkt.loads(search_bounding_polygon_wkt), tiles)
+    # If all tiles end up being masked, there is no work to do
+    if len(tiles) < 1:
+        return []
+    tiles.sort(key=operator.attrgetter('min_time'))
+
+    stacked_tile_data = np.stack(tuple([np.squeeze(tile.data, 0) for tile in tiles]))
+    stacked_tile_lons = np.stack([tile.longitudes for tile in tiles])
+    stacked_tile_lats = np.stack([tile.latitudes for tile in tiles])
+
+    x_array = np.arange(stacked_tile_data.shape[0])
+    point_regressions = np.apply_along_axis(calc_linear_regression, 0, stacked_tile_data, x_array)
+    point_means = np.nanmean(stacked_tile_data, 0)
+    point_maximums = np.nanmax(stacked_tile_data, 0)
+    point_minimums = np.nanmin(stacked_tile_data, 0)
+    point_st_deviations = np.nanstd(stacked_tile_data, 0)
+    point_counts = np.ma.count(np.ma.masked_invalid(stacked_tile_data), 0)
+
+    results = []
+    for lat_idx, lon_idx in np.ndindex(point_means.shape):
+        lon, lat = np.max(stacked_tile_lons[:, lon_idx]), np.max(stacked_tile_lats[:, lat_idx])
+        pcnt = point_counts[lat_idx, lon_idx]
+
+        if pcnt == 0:
+            continue
+
+        mean = point_means[lat_idx, lon_idx]
+        pmax = point_maximums[lat_idx, lon_idx]
+        pmin = point_minimums[lat_idx, lon_idx]
+        pstd = point_st_deviations[lat_idx, lon_idx]
+        slope, intercept, r_value, p_value, std_err = point_regressions[:, lat_idx, lon_idx]
+
+        results.append(((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt)))
+
+    return results
+
+
+class LongitudeLatitudeMapResults(NexusResults):
+    def __init__(self, results=None, meta=None, computeOptions=None, **kwargs):
+        NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions, **kwargs)