You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:24 UTC

[47/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/fetchedgeimpl.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
new file mode 100644
index 0000000..d4a6c4a
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
@@ -0,0 +1,201 @@
+import json
+import traceback
+from datetime import datetime
+from multiprocessing.pool import ThreadPool
+
+import requests
+
+import geo
+import values
+from webservice.webmodel import NexusProcessingException
+
+
+def __parseDatetime(dtString):
+    dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+    epoch = datetime.utcfromtimestamp(0)
+    time = (dt - epoch).total_seconds() * 1000.0
+    return time
+
+
+def __parseLocation(locString):
+    if "Point" in locString:
+        locString = locString[6:-1]
+
+    if "," in locString:
+        latitude = float(locString.split(",")[0])
+        longitude = float(locString.split(",")[1])
+    else:
+        latitude = float(locString.split(" ")[1])
+        longitude = float(locString.split(" ")[0])
+
+    return (latitude, longitude)
+
+
+def __resultRawToUsable(resultdict):
+    resultdict["time"] = __parseDatetime(resultdict["time"])
+    latitude, longitude = __parseLocation(resultdict["point"])
+
+    resultdict["x"] = longitude
+    resultdict["y"] = latitude
+
+    if "id" not in resultdict and "metadata" in resultdict:
+        resultdict["id"] = resultdict["metadata"]
+
+    resultdict["id"] = "id-%s" % resultdict["id"]
+
+    if "device" in resultdict:
+        resultdict["device"] = values.getDeviceById(resultdict["device"])
+
+    if "platform" in resultdict:
+        resultdict["platform"] = values.getPlatformById(resultdict["platform"])
+
+    if "mission" in resultdict:
+        resultdict["mission"] = values.getMissionById(resultdict["mission"])
+
+    if "sea_surface_temperature" in resultdict:
+        resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"]
+        del resultdict["sea_surface_temperature"]
+
+    return resultdict
+
+
+def __fetchJson(url, params, trycount=1, maxtries=5):
+    if trycount > maxtries:
+        raise Exception("Maximum retries attempted.")
+    if trycount > 1:
+        print "Retry #", trycount
+    r = requests.get(url, params=params, timeout=500.000)
+
+    print r.url
+
+    if r.status_code != 200:
+        return __fetchJson(url, params, trycount + 1, maxtries)
+    try:
+        results = json.loads(r.text)
+        return results
+    except:
+        return __fetchJson(url, params, trycount + 1, maxtries)
+
+
+def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0, platforms=None,
+              pageCallback=None):
+    params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage,
+              "startIndex": startIndex, "stats": "true"}
+
+    if depth_min is not None:
+        params['minDepth'] = depth_min
+    if depth_max is not None:
+        params['maxDepth'] = depth_max
+
+    if platforms is not None:
+        params["platform"] = platforms.split(",")
+
+    resultsRaw = __fetchJson(endpoint["url"], params)
+    boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
+
+    if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0:  # Double-sanity check
+        return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer
+
+    try:
+        results = []
+        for resultdict in resultsRaw["results"]:
+            result = __resultRawToUsable(resultdict)
+            result["source"] = endpoint["name"]
+            boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"])
+            results.append(result)
+
+        if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0:
+            stats = resultsRaw["stats_fields"]
+            if "lat" in stats and "lon" in stats:
+                boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'],
+                                             west=stats['lon']['min'], east=stats['lon']['max'])
+
+        if pageCallback is not None:
+            pageCallback(results)
+
+        '''
+            If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
+        '''
+        if pageCallback is None:
+            return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
+                resultsRaw["itemsPerPage"]), boundsConstrainer
+        else:
+            return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
+                resultsRaw["itemsPerPage"]), boundsConstrainer
+    except:
+        print "Invalid or missing JSON in response."
+        traceback.print_exc()
+        raise NexusProcessingException(reason="Invalid or missing JSON in response.")
+        # return [], 0, startIndex, itemsPerPage, boundsConstrainer
+
+
+def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+    startIndex = 0
+    pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
+                                                                                            endTime, bbox,
+                                                                                            depth_min, depth_max, 0,
+                                                                                            startIndex, platforms)
+    return totalResults, boundsConstrainer
+
+
+def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None):
+    results = []
+    startIndex = 0
+
+    mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
+
+    # First isn't parellel so we can get the ttl results, forced items per page, etc...
+    pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
+                                                                                            endTime, bbox,
+                                                                                            depth_min, depth_max,
+                                                                                            endpoint["itemsPerPage"],
+                                                                                            startIndex, platforms,
+                                                                                            pageCallback)
+    results = results + pageResults
+    mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer)
+
+    pool = ThreadPool(processes=endpoint["fetchThreads"])
+    mpResults = [pool.apply_async(__doQuery, args=(
+        endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in
+                 range(len(pageResults), totalResults, itemsPerPageR)]
+    pool.close()
+    pool.join()
+
+    '''
+        If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
+    '''
+    if pageCallback is None:
+        mpResults = [p.get() for p in mpResults]
+        for mpResult in mpResults:
+            results = results + mpResult[0]
+            mainBoundsConstrainer.testOtherConstrainer(mpResult[4])
+
+        return results, mainBoundsConstrainer
+
+
+def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
+    results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+    if placeholders:
+        trimmedResults = []
+        for item in results:
+            depth = None
+            if "depth" in item:
+                depth = item["depth"]
+            if "sea_water_temperature_depth" in item:
+                depth = item["sea_water_temperature_depth"]
+
+            trimmedItem = {
+                "x": item["x"],
+                "y": item["y"],
+                "source": item["source"],
+                "time": item["time"],
+                "device": item["device"] if "device" in item else None,
+                "platform": item["platform"],
+                "depth": depth
+            }
+            trimmedResults.append(trimmedItem)
+
+        results = trimmedResults
+
+    return results, boundsConstrainer

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/geo.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py
new file mode 100644
index 0000000..ece1788
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/geo.py
@@ -0,0 +1,113 @@
+import math
+
+
+MEAN_RADIUS_EARTH_METERS = 6371010.0
+EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0
+POLAR_RADIUS_EARTH_METERS = 6356752.0
+FLATTENING_EARTH = 298.257223563
+MEAN_RADIUS_EARTH_MILES = 3958.8
+
+
+class DistanceUnit(object):
+    METERS = 0
+    MILES = 1
+
+
+# Haversine implementation for great-circle distances between two points
+def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS):
+    if units == DistanceUnit.METERS:
+        R = MEAN_RADIUS_EARTH_METERS
+    elif units == DistanceUnit.MILES:
+        R = MEAN_RADIUS_EARTH_MILES
+    else:
+        raise Exception("Invalid units specified")
+    x0r = x0 * (math.pi / 180.0) # To radians
+    x1r = x1 * (math.pi / 180.0) # To radians
+    xd = (x1 - x0) * (math.pi / 180.0)
+    yd = (y1 - y0) * (math.pi / 180.0)
+
+    a = math.sin(xd/2.0) * math.sin(xd/2.0) + \
+        math.cos(x0r) * math.cos(x1r) * \
+        math.sin(yd / 2.0) * math.sin(yd / 2.0)
+    c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
+    d = R * c
+    return d
+
+
+# Equirectangular approximation for when performance is key. Better at smaller distances
+def equirectangularApprox(x0, y0, x1, y1):
+    R = 6371000.0 # Meters
+    x0r = x0 * (math.pi / 180.0) # To radians
+    x1r = x1 * (math.pi / 180.0)
+    y0r = y0 * (math.pi / 180.0)
+    y1r = y1 * (math.pi / 180.0)
+
+    x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0)
+    y = x1r - x0r
+    d = math.sqrt(x*x + y*y) * R
+    return d
+
+
+
+class BoundingBox(object):
+
+    def __init__(self, north=None, south=None, west=None, east=None, asString=None):
+        if asString is not None:
+            bboxParts = asString.split(",")
+            self.west=float(bboxParts[0])
+            self.south=float(bboxParts[1])
+            self.east=float(bboxParts[2])
+            self.north=float(bboxParts[3])
+        else:
+            self.north = north
+            self.south = south
+            self.west = west
+            self.east = east
+
+    def toString(self):
+        return "%s,%s,%s,%s"%(self.west, self.south, self.east, self.north)
+
+    def toMap(self):
+        return {
+            "xmin": self.west,
+            "xmax": self.east,
+            "ymin": self.south,
+            "ymax": self.north
+        }
+
+'''
+    Constrains, does not expand.
+'''
+class BoundsConstrainer(BoundingBox):
+
+    def __init__(self, north=None, south=None, west=None, east=None, asString=None):
+        BoundingBox.__init__(self, north, south, west, east, asString)
+
+    def testNorth(self, v):
+        if v is None:
+            return
+        self.north = max([self.north, v])
+
+    def testSouth(self, v):
+        if v is None:
+            return
+        self.south = min([self.south, v])
+
+    def testEast(self, v):
+        if v is None:
+            return
+        self.east = max([self.east, v])
+
+    def testWest(self, v):
+        if v is None:
+            return
+        self.west = min([self.west, v])
+
+    def testCoords(self, north=None, south=None, west=None, east=None):
+        self.testNorth(north)
+        self.testSouth(south)
+        self.testWest(west)
+        self.testEast(east)
+
+    def testOtherConstrainer(self, other):
+        self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/histogramplot.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py
new file mode 100644
index 0000000..a413769
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/histogramplot.py
@@ -0,0 +1,117 @@
+
+import BaseDomsHandler
+import ResultsStorage
+import string
+from cStringIO import StringIO
+import matplotlib.mlab as mlab
+
+from multiprocessing import Process, Manager
+
+import numpy as np
+import matplotlib.pyplot as plt
+import matplotlib
+matplotlib.use('Agg')
+
+
+
+
+PARAMETER_TO_FIELD = {
+    "sst": "sea_water_temperature",
+    "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+    "sst": "($^\circ$C)",
+    "sss": "(g/L)"
+}
+
+class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+
+    def __init__(self,  x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None):
+        BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId)
+        self.__primary = primary
+        self.__secondary = secondary
+        self.__x = x
+        self.__parameter = parameter
+        self.__plot = plot
+
+    def toImage(self):
+        return self.__plot
+
+
+def render(d, x, primary, secondary, parameter, norm_and_curve=False):
+
+    fig, ax = plt.subplots()
+    fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold')
+
+
+
+    n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75)
+
+
+    if norm_and_curve:
+        mean = np.mean(x)
+        variance = np.var(x)
+        sigma = np.sqrt(variance)
+        y = mlab.normpdf(bins, mean, sigma)
+        l = plt.plot(bins, y, 'r--', linewidth=1)
+
+    ax.set_title('n = %d' % len(x))
+
+    units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
+    ax.set_xlabel("%s - %s %s" % (primary, secondary, units))
+
+    if norm_and_curve:
+        ax.set_ylabel("Probability per unit difference")
+    else:
+        ax.set_ylabel("Frequency")
+
+    plt.grid(True)
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    d['plot'] = sio.getvalue()
+
+
+def renderAsync(x, primary, secondary, parameter, norm_and_curve):
+    manager = Manager()
+    d = manager.dict()
+    p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve))
+    p.start()
+    p.join()
+    return d['plot']
+
+
+def createHistogramPlot(id, parameter, norm_and_curve=False):
+
+    with ResultsStorage.ResultsRetrieval() as storage:
+        params, stats, data = storage.retrieveResults(id)
+
+    primary = params["primary"]
+    secondary = params["matchup"][0]
+
+    x = createHistTable(data, secondary, parameter)
+
+    plot = renderAsync(x, primary, secondary, parameter, norm_and_curve)
+
+    r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary,
+                                    args=params, details=stats,
+                                    bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+    return r
+
+
+def createHistTable(results, secondary, parameter):
+
+    x = []
+
+    field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+    for entry in results:
+        for match in entry["matches"]:
+            if match["source"] == secondary:
+                if field in entry and field in match:
+                    a = entry[field]
+                    b = match[field]
+                    x.append((a - b))
+
+    return x
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/insitusubset.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py
new file mode 100644
index 0000000..c688530
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/insitusubset.py
@@ -0,0 +1,248 @@
+import StringIO
+import csv
+import json
+import logging
+from datetime import datetime
+
+import requests
+
+import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.webmodel import NexusProcessingException, NoDataException
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler):
+    name = "DOMS In Situ Subsetter"
+    path = "/domsinsitusubset"
+    description = "Subset a DOMS in situ source given the search domain."
+
+    params = [
+        {
+            "name": "source",
+            "type": "comma-delimited string",
+            "description": "The in situ Dataset to be sub-setted",
+            "required": "true",
+            "sample": "spurs"
+        },
+        {
+            "name": "parameter",
+            "type": "string",
+            "description": "The parameter of interest. One of 'sst', 'sss', 'wind'",
+            "required": "false",
+            "default": "All",
+            "sample": "sss"
+        },
+        {
+            "name": "startTime",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
+            "required": "true",
+            "sample": "2013-10-21T00:00:00Z"
+        },
+        {
+            "name": "endTime",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
+            "required": "true",
+            "sample": "2013-10-31T23:59:59Z"
+        },
+        {
+            "name": "b",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+            "required": "true",
+            "sample": "-30,15,-45,30"
+        },
+        {
+            "name": "depthMin",
+            "type": "float",
+            "description": "Minimum depth of measurements. Must be less than depthMax",
+            "required": "false",
+            "default": "No limit",
+            "sample": "0"
+        },
+        {
+            "name": "depthMax",
+            "type": "float",
+            "description": "Maximum depth of measurements. Must be greater than depthMin",
+            "required": "false",
+            "default": "No limit",
+            "sample": "5"
+        },
+        {
+            "name": "platforms",
+            "type": "comma-delimited integer",
+            "description": "Platforms to include for subset consideration",
+            "required": "false",
+            "default": "All",
+            "sample": "1,2,3,4,5,6,7,8,9"
+        },
+        {
+            "name": "output",
+            "type": "string",
+            "description": "Output type. Only 'CSV' or 'JSON' is currently supported",
+            "required": "false",
+            "default": "JSON",
+            "sample": "CSV"
+        }
+    ]
+    singleton = True
+
+    def __init__(self):
+        BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        source_name = request.get_argument('source', None)
+        if source_name is None or source_name.strip() == '':
+            raise NexusProcessingException(reason="'source' argument is required", code=400)
+
+        parameter_s = request.get_argument('parameter', None)
+        if parameter_s not in ['sst', 'sss', 'wind', None]:
+            raise NexusProcessingException(
+                reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+            start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+            end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            raise NexusProcessingException(
+                reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                code=400)
+
+        depth_min = request.get_decimal_arg('depthMin', default=None)
+        depth_max = request.get_decimal_arg('depthMax', default=None)
+
+        if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+            raise NexusProcessingException(
+                reason="Depth Min should be less than Depth Max", code=400)
+
+        platforms = request.get_argument('platforms', None)
+        if platforms is not None:
+            try:
+                p_validation = platforms.split(',')
+                p_validation = [int(p) for p in p_validation]
+                del p_validation
+            except:
+                raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+        return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms
+
+    def calc(self, request, **args):
+
+        source_name, parameter_s, start_time, end_time, bounding_polygon, \
+        depth_min, depth_max, platforms = self.parse_arguments(request)
+
+        with requests.session() as edge_session:
+            edge_results = query_edge(source_name, parameter_s, start_time, end_time,
+                                      ','.join([str(bound) for bound in bounding_polygon.bounds]),
+                                      platforms, depth_min, depth_max, edge_session)['results']
+
+        if len(edge_results) == 0:
+            raise NoDataException
+        return InSituSubsetResult(results=edge_results)
+
+
+class InSituSubsetResult(object):
+    def __init__(self, results):
+        self.results = results
+
+    def toJson(self):
+        return json.dumps(self.results, indent=4)
+
+    def toCSV(self):
+        fieldnames = sorted(next(iter(self.results)).keys())
+
+        csv_mem_file = StringIO.StringIO()
+        try:
+            writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames)
+
+            writer.writeheader()
+            writer.writerows(self.results)
+            csv_out = csv_mem_file.getvalue()
+        finally:
+            csv_mem_file.close()
+
+        return csv_out
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000,
+               startIndex=0, stats=True):
+    log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge')
+    try:
+        startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        platform = platform.split(',')
+    except AttributeError:
+        # Assume we were passed a list
+        pass
+
+    params = {"startTime": startTime,
+              "endTime": endTime,
+              "bbox": bbox,
+              "minDepth": depth_min,
+              "maxDepth": depth_max,
+              "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+    if variable:
+        params['variable'] = variable
+    if platform:
+        params['platform'] = platform
+
+    edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+    edge_request.raise_for_status()
+    edge_response = json.loads(edge_request.text)
+
+    # Get all edge results
+    next_page_url = edge_response.get('next', None)
+    while next_page_url is not None:
+        log.debug("requesting %s" % next_page_url)
+        edge_page_request = session.get(next_page_url)
+
+        edge_page_request.raise_for_status()
+        edge_page_response = json.loads(edge_page_request.text)
+
+        edge_response['results'].extend(edge_page_response['results'])
+
+        next_page_url = edge_page_response.get('next', None)
+
+    return edge_response

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/mapplot.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py
new file mode 100644
index 0000000..47870a7
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/mapplot.py
@@ -0,0 +1,171 @@
+
+from mpl_toolkits.basemap import Basemap
+
+import BaseDomsHandler
+import ResultsStorage
+import numpy as np
+import string
+from cStringIO import StringIO
+
+from multiprocessing import Process, Manager
+
+import matplotlib.pyplot as plt
+import matplotlib
+matplotlib.use('Agg')
+
+
+PARAMETER_TO_FIELD = {
+    "sst": "sea_water_temperature",
+    "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+    "sst": "($^\circ$ C)",
+    "sss": "(g/L)"
+}
+
+
+def __square(minLon, maxLon, minLat, maxLat):
+    if maxLat - minLat > maxLon - minLon:
+        a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0
+        minLon -= a
+        maxLon += a
+    elif maxLon - minLon > maxLat - minLat:
+        a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0
+        minLat -= a
+        maxLat += a
+
+    return minLon, maxLon, minLat, maxLat
+
+
+def render(d, lats, lons, z, primary, secondary, parameter):
+    fig = plt.figure()
+    ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
+
+
+    ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
+    # ax.set_ylabel('Latitude')
+    # ax.set_xlabel('Longitude')
+
+    minLatA = np.min(lats)
+    maxLatA = np.max(lats)
+    minLonA = np.min(lons)
+    maxLonA = np.max(lons)
+
+    minLat = minLatA - (abs(maxLatA - minLatA) * 0.1)
+    maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1)
+
+    minLon = minLonA - (abs(maxLonA - minLonA) * 0.1)
+    maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1)
+
+    minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat)
+
+
+    # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l')
+    m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat,
+                resolution='l')
+
+
+    m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10)
+    m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10)
+
+    m.drawcoastlines()
+    m.drawmapboundary(fill_color='#99ffff')
+    m.fillcontinents(color='#cc9966', lake_color='#99ffff')
+
+    #lats, lons = np.meshgrid(lats, lons)
+
+    masked_array = np.ma.array(z, mask=np.isnan(z))
+    z = masked_array
+
+
+    values = np.zeros(len(z))
+    for i in range(0, len(z)):
+        values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10
+
+    x, y = m(lons, lats)
+
+    im1 = m.scatter(x, y, values)
+
+
+    im1.set_array(z)
+    cb = m.colorbar(im1)
+
+    units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
+    cb.set_label("Difference %s" % units)
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    plot = sio.getvalue()
+    if d is not None:
+        d['plot'] = plot
+    return plot
+
+
+
+class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+    def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None):
+        BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId)
+        self.__lats = lats
+        self.__lons = lons
+        self.__z = np.array(z)
+        self.__parameter = parameter
+        self.__primary = primary
+        self.__secondary = secondary
+        self.__plot = plot
+
+
+
+    def toImage(self):
+        return self.__plot
+
+
+
+
+
+def renderAsync(x, y, z, primary, secondary, parameter):
+    manager = Manager()
+    d = manager.dict()
+    p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
+    p.start()
+    p.join()
+    return d['plot']
+
+
+
+def createMapPlot(id, parameter):
+
+    with ResultsStorage.ResultsRetrieval() as storage:
+        params, stats, data = storage.retrieveResults(id)
+
+    primary = params["primary"]
+    secondary = params["matchup"][0]
+
+    lats = []
+    lons = []
+    z = []
+
+    field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+    for entry in data:
+        for match in entry["matches"]:
+            if match["source"] == secondary:
+
+                if field in entry and field in match:
+                    a = entry[field]
+                    b = match[field]
+                    z.append((a - b))
+                    z.append((a - b))
+                else:
+                    z.append(1.0)
+                    z.append(1.0)
+                lats.append(entry["y"])
+                lons.append(entry["x"])
+                lats.append(match["y"])
+                lons.append(match["x"])
+
+    plot = renderAsync(lats, lons, z, primary, secondary, parameter)
+    r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary,
+                                args=params,
+                                details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+    return r

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/scatterplot.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py
new file mode 100644
index 0000000..0cbe110
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/scatterplot.py
@@ -0,0 +1,112 @@
+
+import BaseDomsHandler
+import ResultsStorage
+import string
+from cStringIO import StringIO
+
+from multiprocessing import Process, Manager
+
+import matplotlib.pyplot as plt
+import matplotlib
+matplotlib.use('Agg')
+
+
+
+
+PARAMETER_TO_FIELD = {
+    "sst": "sea_water_temperature",
+    "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+    "sst": "($^\circ$ C)",
+    "sss": "(g/L)"
+}
+
+
+
+def render(d, x, y, z, primary, secondary, parameter):
+    fig, ax = plt.subplots()
+
+    ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
+
+    units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[
+        "sst"]
+    ax.set_ylabel("%s %s" % (secondary, units))
+    ax.set_xlabel("%s %s" % (primary, units))
+
+    ax.scatter(x, y)
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    d['plot'] = sio.getvalue()
+
+class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+
+    def __init__(self,  x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None):
+        BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId)
+        self.__primary = primary
+        self.__secondary = secondary
+        self.__x = x
+        self.__y = y
+        self.__z = z
+        self.__parameter = parameter
+        self.__plot = plot
+
+    def toImage(self):
+        return self.__plot
+
+
+
+
+def renderAsync(x, y, z, primary, secondary, parameter):
+    manager = Manager()
+    d = manager.dict()
+    p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
+    p.start()
+    p.join()
+    return d['plot']
+
+
+
+def createScatterPlot(id, parameter):
+
+    with ResultsStorage.ResultsRetrieval() as storage:
+        params, stats, data = storage.retrieveResults(id)
+
+    primary = params["primary"]
+    secondary = params["matchup"][0]
+
+    x, y, z = createScatterTable(data, secondary, parameter)
+
+    plot = renderAsync(x, y, z, primary, secondary, parameter)
+
+    r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary,
+                                    args=params, details=stats,
+                                    bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+    return r
+
+
+def createScatterTable(results, secondary, parameter):
+    x = []
+    y = []
+    z = []
+
+    field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+    for entry in results:
+        for match in entry["matches"]:
+            if match["source"] == secondary:
+                if field in entry and field in match:
+                    a = entry[field]
+                    b = match[field]
+                    x.append(a)
+                    y.append(b)
+                    z.append(a - b)
+
+    return x, y, z
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/subsetter.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py
new file mode 100644
index 0000000..6727a48
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -0,0 +1,245 @@
+import logging
+import os
+import tempfile
+import zipfile
+from datetime import datetime
+
+import requests
+
+import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import NexusProcessingException
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def is_blank(my_string):
+    return not (my_string and my_string.strip() != '')
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler):
+    name = "DOMS Subsetter"
+    path = "/domssubset"
+    description = "Subset DOMS sources given the search domain"
+
+    params = {
+        "dataset": {
+            "name": "NEXUS Dataset",
+            "type": "string",
+            "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required"
+        },
+        "insitu": {
+            "name": "In Situ sources",
+            "type": "comma-delimited string",
+            "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required"
+        },
+        "parameter": {
+            "name": "Data Parameter",
+            "type": "string",
+            "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "depthMin": {
+            "name": "Minimum Depth",
+            "type": "float",
+            "description": "Minimum depth of measurements. Must be less than depthMax. Optional"
+        },
+        "depthMax": {
+            "name": "Maximum Depth",
+            "type": "float",
+            "description": "Maximum depth of measurements. Must be greater than depthMin. Optional"
+        },
+        "platforms": {
+            "name": "Platforms",
+            "type": "comma-delimited integer",
+            "description": "Platforms to include for subset consideration. Optional"
+        },
+        "output": {
+            "name": "Output",
+            "type": "string",
+            "description": "Output type. Only 'ZIP' is currently supported. Required"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        primary_ds_name = request.get_argument('dataset', None)
+        matchup_ds_names = request.get_argument('insitu', None)
+
+        if is_blank(primary_ds_name) and is_blank(matchup_ds_names):
+            raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required",
+                                           code=400)
+
+        if matchup_ds_names is not None:
+            try:
+                matchup_ds_names = matchup_ds_names.split(',')
+            except:
+                raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400)
+
+        parameter_s = request.get_argument('parameter', None)
+        if parameter_s not in ['sst', 'sss', 'wind']:
+            raise NexusProcessingException(
+                reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+            start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+            end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            raise NexusProcessingException(
+                reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                code=400)
+
+        depth_min = request.get_decimal_arg('depthMin', default=None)
+        depth_max = request.get_decimal_arg('depthMax', default=None)
+
+        if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+            raise NexusProcessingException(
+                reason="Depth Min should be less than Depth Max", code=400)
+
+        platforms = request.get_argument('platforms', None)
+        if platforms is not None:
+            try:
+                p_validation = platforms.split(',')
+                p_validation = [int(p) for p in p_validation]
+                del p_validation
+            except:
+                raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+        return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
+               bounding_polygon, depth_min, depth_max, platforms
+
+    def calc(self, request, **args):
+
+        primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
+        bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request)
+
+        primary_url = "https://doms.jpl.nasa.gov/datainbounds"
+        primary_params = {
+            'ds': primary_ds_name,
+            'parameter': parameter_s,
+            'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
+            'startTime': start_time,
+            'endTime': end_time,
+            'output': "CSV"
+        }
+
+        matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset"
+        matchup_params = {
+            'source': None,
+            'parameter': parameter_s,
+            'startTime': start_time,
+            'endTime': end_time,
+            'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
+            'depthMin': depth_min,
+            'depthMax': depth_max,
+            'platforms': platforms,
+            'output': 'CSV'
+        }
+
+        primary_temp_file_path = None
+        matchup_downloads = None
+
+        with requests.session() as session:
+
+            if not is_blank(primary_ds_name):
+                # Download primary
+                primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv')
+                download_file(primary_url, primary_temp_file_path, session, params=primary_params)
+
+            if len(matchup_ds_names) > 0:
+                # Download matchup
+                matchup_downloads = {}
+                for matchup_ds in matchup_ds_names:
+                    matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv')
+                    matchup_params['source'] = matchup_ds
+                    download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params)
+
+        # Zip downloads
+        date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"),
+                                datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"))
+        bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds
+        zip_dir = tempfile.mkdtemp()
+        zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds)
+        with zipfile.ZipFile(zip_path, 'w') as my_zip:
+            if primary_temp_file_path:
+                my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds))
+            if matchup_downloads:
+                for matchup_ds, download in matchup_downloads.iteritems():
+                    my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds))
+
+        # Clean up
+        if primary_temp_file_path:
+            os.remove(primary_temp_file_path)
+        if matchup_downloads:
+            for matchup_ds, download in matchup_downloads.iteritems():
+                os.remove(download[1])
+
+        return SubsetResult(zip_path)
+
+
+class SubsetResult(object):
+    def __init__(self, zip_path):
+        self.zip_path = zip_path
+
+    def toJson(self):
+        raise NotImplementedError
+
+    def toZip(self):
+        with open(self.zip_path, 'rb') as zip_file:
+            zip_contents = zip_file.read()
+
+        return zip_contents
+
+    def cleanup(self):
+        os.remove(self.zip_path)
+
+
+def download_file(url, filepath, session, params=None):
+    r = session.get(url, params=params, stream=True)
+    with open(filepath, 'wb') as f:
+        for chunk in r.iter_content(chunk_size=1024):
+            if chunk:  # filter out keep-alive new chunks
+                f.write(chunk)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/values.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py
new file mode 100644
index 0000000..5e98ce9
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/values.py
@@ -0,0 +1,57 @@
+PLATFORMS = [
+    {"id": 1, "desc": "ship"},
+    {"id": 2, "desc": "moored surface buoy"},
+    {"id": 3, "desc": "drifting surface float"},
+    {"id": 4, "desc": "drifting subsurface profiling float"},
+    {"id": 5, "desc": "autonomous underwater vehicle"},
+    {"id": 6, "desc": "offshore structure"},
+    {"id": 7, "desc": "coastal structure"},
+    {"id": 8, "desc": "towed unmanned submersible"},
+    {"id": 9, "desc": "orbiting satellite"}
+]
+
+DEVICES = [
+    {"id": 1, "desc": "bathythermographs"},
+    {"id": 2, "desc": "discrete water samplers"},
+    {"id": 3, "desc": "CTD"},
+    {"id": 4, "desc": "Current profilers  / acousticDopplerCurrentProfiler"},
+    {"id": 5, "desc": "radiometers"},
+    {"id": 6, "desc": "scatterometers"}
+]
+
+MISSIONS = [
+    {"id": 1, "desc": "SAMOS"},
+    {"id": 2, "desc": "ICOADS"},
+    {"id": 3, "desc": "Aquarius"},
+    {"id": 4, "desc": "SPURS1"}
+]
+
+
+def getDescById(list, id):
+    for item in list:
+        if item["id"] == id:
+            return item["desc"]
+    return id
+
+
+def getPlatformById(id):
+    return getDescById(PLATFORMS, id)
+
+
+def getDeviceById(id):
+    return getDescById(DEVICES, id)
+
+
+def getMissionById(id):
+    return getDescById(MISSIONS, id)
+
+
+def getDescByListNameAndId(listName, id):
+    if listName.upper() == "PLATFORM":
+        return getPlatformById(id)
+    elif listName.upper() == "DEVICE":
+        return getDeviceById(id)
+    elif listName.upper() == "MISSION":
+        return getMissionById(id)
+    else:
+        raise Exception("Invalid list name specified ('%s')" % listName)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/workerthread.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py
new file mode 100644
index 0000000..097e604
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/workerthread.py
@@ -0,0 +1,51 @@
+import os
+import sys
+import threading
+
+class WorkerThread(threading.Thread):
+
+    def __init__(self, method, params):
+        threading.Thread.__init__(self)
+        self.method = method
+        self.params = params
+        self.completed = False
+        self.results = None
+
+
+    def run(self):
+
+
+        self.results = self.method(*self.params)
+        self.completed = True
+
+
+def __areAllComplete(threads):
+
+    for thread in threads:
+        if not thread.completed:
+            return False
+
+    return True
+
+def wait(threads, startFirst=False, poll=0.5):
+
+    if startFirst:
+        for thread in threads:
+            thread.start()
+
+    while not __areAllComplete(threads):
+        threading._sleep(poll)
+
+
+
+def foo(param1, param2):
+    print param1, param2
+    return "c"
+
+if __name__ == "__main__":
+
+    thread = WorkerThread(foo, params=("a", "b"))
+    thread.start()
+    while not thread.completed:
+        threading._sleep(0.5)
+    print thread.results

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/ClimMapSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py
new file mode 100644
index 0000000..77e3c38
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -0,0 +1,257 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import logging
+from calendar import timegm, monthrange
+from datetime import datetime
+
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+
+from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+
+@nexus_handler
+class ClimMapSparkHandlerImpl(SparkHandler):
+    name = "Climatology Map Spark"
+    path = "/climMapSpark"
+    description = "Computes a Latitude/Longitude Time Average map for a given month given an arbitrary geographical area and year range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+        # self.log.setLevel(logging.DEBUG)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+        # print 'Started tile', tile_bounds
+        # sys.stdout.flush()
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+        days_at_a_time = 90
+        # days_at_a_time = 30
+        # days_at_a_time = 7
+        # days_at_a_time = 1
+        # print 'days_at_a_time = ', days_at_a_time
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+            # t1 = time()
+            # print 'nexus call start at time %f' % t1
+            # sys.stdout.flush()
+            nexus_tiles = \
+                ClimMapSparkHandlerImpl.query_by_parts(tile_service,
+                                                       min_lat, max_lat,
+                                                       min_lon, max_lon,
+                                                       ds,
+                                                       t_start,
+                                                       t_end,
+                                                       part_dim=2)
+            # nexus_tiles = \
+            #    tile_service.get_tiles_bounded_by_box(min_lat, max_lat, 
+            #                                          min_lon, max_lon, 
+            #                                          ds=ds, 
+            #                                          start_time=t_start, 
+            #                                          end_time=t_end)
+            # t2 = time()
+            # print 'nexus call end at time %f' % t2
+            # print 'secs in nexus call: ', t2-t1
+            # sys.stdout.flush()
+            # print 't %d to %d - Got %d tiles' % (t_start, t_end,
+            #                                     len(nexus_tiles))
+            # for nt in nexus_tiles:
+            #    print nt.granule
+            #    print nt.section_spec
+            #    print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes)
+            #    print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes)
+            # sys.stdout.flush()
+
+            for tile in nexus_tiles:
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                cnt_tile += (~tile.data.mask[0,
+                              min_y:max_y + 1,
+                              min_x:max_x + 1]).astype(np.uint8)
+            t_start = t_end + 1
+
+        # print 'cnt_tile = ', cnt_tile
+        # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
+        # sum_tile.mask = cnt_tile.mask
+        # avg_tile = sum_tile / cnt_tile
+        # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])]
+        # print 'Finished tile', tile_bounds
+        # print 'Tile avg = ', avg_tile
+        # sys.stdout.flush()
+        return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
+
+    def _month_from_timestamp(self, t):
+        return datetime.utcfromtimestamp(t).month
+
+    def calc(self, computeOptions, **args):
+        """
+
+        :param computeOptions: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
+        self._setQueryParams(computeOptions.get_dataset()[0],
+                             (float(computeOptions.get_min_lat()),
+                              float(computeOptions.get_max_lat()),
+                              float(computeOptions.get_min_lon()),
+                              float(computeOptions.get_max_lon())),
+                             start_year=computeOptions.get_start_year(),
+                             end_year=computeOptions.get_end_year(),
+                             clim_month=computeOptions.get_clim_month(),
+                             spark_master=spark_master,
+                             spark_nexecs=spark_nexecs,
+                             spark_nparts=spark_nparts)
+        self._startTime = timegm((self._startYear, 1, 1, 0, 0, 0))
+        self._endTime = timegm((self._endYear, 12, 31, 23, 59, 59))
+
+        if 'CLIM' in self._ds:
+            raise NexusProcessingException(reason="Cannot compute Latitude/Longitude Time Average map on a climatology",
+                                           code=400)
+
+        nexus_tiles = self._find_global_tile_set()
+        # print 'tiles:'
+        # for tile in nexus_tiles:
+        #     print tile.granule
+        #     print tile.section_spec
+        #     print 'lat:', tile.latitudes
+        #     print 'lon:', tile.longitudes
+
+        #                                                          nexus_tiles)
+        if len(nexus_tiles) == 0:
+            raise NoDataException(reason="No data found for selected timeframe")
+
+        self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+        # for tile in nexus_tiles:
+        #    print 'lats: ', tile.latitudes.compressed()
+        #    print 'lons: ', tile.longitudes.compressed()
+        self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
+        nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1
+        nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1
+        self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons))
+        self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
+                                                              self._maxLatCent))
+        self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
+                                                              self._maxLonCent))
+
+        # Create array of tuples to pass to Spark map function
+        nexus_tiles_spark = [[self._find_tile_bounds(t),
+                              self._startTime, self._endTime,
+                              self._ds] for t in nexus_tiles]
+        # print 'nexus_tiles_spark = ', nexus_tiles_spark
+        # Remove empty tiles (should have bounds set to None)
+        bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+        for i in np.flipud(bad_tile_inds):
+            del nexus_tiles_spark[i]
+        num_nexus_tiles_spark = len(nexus_tiles_spark)
+        self.log.debug('Created {0} spark tiles'.format(num_nexus_tiles_spark))
+
+        # Expand Spark map tuple array by duplicating each entry N times,
+        # where N is the number of ways we want the time dimension carved up.
+        # (one partition per year in this case).
+        num_years = self._endYear - self._startYear + 1
+        nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_years, axis=0)
+        self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
+
+        # Set the time boundaries for each of the Spark map tuples.
+        # Every Nth element in the array gets the same time bounds.
+        spark_part_time_ranges = \
+            np.repeat(np.array([[timegm((y, self._climMonth, 1, 0, 0, 0)),
+                                 timegm((y, self._climMonth,
+                                         monthrange(y, self._climMonth)[1],
+                                         23, 59, 59))]
+                                for y in range(self._startYear,
+                                               self._endYear + 1)]),
+                      num_nexus_tiles_spark,
+                      axis=0).reshape((len(nexus_tiles_spark), 2))
+        self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
+        nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+        # print 'nexus_tiles_spark final = '
+        # for i in range(len(nexus_tiles_spark)):
+        #    print nexus_tiles_spark[i]
+
+        # Launch Spark computations
+        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        sum_count_part = rdd.map(self._map)
+        sum_count = \
+            sum_count_part.combineByKey(lambda val: val,
+                                        lambda x, val: (x[0] + val[0],
+                                                        x[1] + val[1]),
+                                        lambda x, y: (x[0] + y[0], x[1] + y[1]))
+        avg_tiles = \
+            sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
+                          (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x])
+                          if (cnt_tile[y, x] > 0) else 0.,
+                                      'cnt': cnt_tile[y, x]}
+                                     for x in
+                                     range(sum_tile.shape[1])]
+                                    for y in
+                                    range(sum_tile.shape[0])])).collect()
+
+        # Combine subset results to produce global map.
+        #
+        # The tiles below are NOT Nexus objects.  They are tuples
+        # with the time avg map data and lat-lon bounding box.
+        a = np.zeros((nlats, nlons), dtype=np.float64, order='C')
+        n = np.zeros((nlats, nlons), dtype=np.uint32, order='C')
+        for tile in avg_tiles:
+            if tile is not None:
+                ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+                 tile_stats) = tile
+                tile_data = np.ma.array(
+                    [[tile_stats[y][x]['avg'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+                tile_cnt = np.array(
+                    [[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+                tile_data.mask = ~(tile_cnt.astype(bool))
+                y0 = self._lat2ind(tile_min_lat)
+                y1 = y0 + tile_data.shape[0] - 1
+                x0 = self._lon2ind(tile_min_lon)
+                x1 = x0 + tile_data.shape[1] - 1
+                if np.any(np.logical_not(tile_data.mask)):
+                    self.log.debug(
+                        'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat,
+                                                                                                     tile_max_lat,
+                                                                                                     tile_min_lon,
+                                                                                                     tile_max_lon, y0,
+                                                                                                     y1, x0, x1))
+                    a[y0:y1 + 1, x0:x1 + 1] = tile_data
+                    n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+                else:
+                    self.log.debug(
+                        'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(
+                            tile_min_lat, tile_max_lat,
+                            tile_min_lon, tile_max_lon,
+                            y0, y1, x0, x1))
+
+        # Store global map in a NetCDF file.
+        self._create_nc_file(a, 'clmap.nc', 'val')
+
+        # Create dict for JSON response
+        results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+                     'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+                    for x in range(a.shape[1])] for y in range(a.shape[0])]
+
+        return ClimMapSparkResults(results=results, meta={}, computeOptions=computeOptions)
+
+
+class ClimMapSparkResults(NexusResults):
+    def __init__(self, results=None, meta=None, computeOptions=None):
+        NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/CorrMapSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py
new file mode 100644
index 0000000..32f63e6
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -0,0 +1,316 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import json
+import logging
+
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+
+# from time import time
+from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException
+
+
+@nexus_handler
+class CorrMapSparkHandlerImpl(SparkHandler):
+    name = "Correlation Map Spark"
+    path = "/corrMapSpark"
+    description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+        # self.log.setLevel(logging.DEBUG)
+
+    @staticmethod
+    def _map(tile_in):
+        # Unpack input
+        tile_bounds, start_time, end_time, ds = tile_in
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+
+        # Create arrays to hold intermediate results during
+        # correlation coefficient calculation.
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+        sumx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+        sumy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+        sumxx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+        sumyy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+        sumxy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+        n_tile = np.zeros(tile_inbounds_shape, dtype=np.uint32)
+
+        # Can only retrieve some number of days worth of data from Solr
+        # at a time.  Set desired value here.
+        days_at_a_time = 90
+        # days_at_a_time = 30
+        # days_at_a_time = 7
+        # days_at_a_time = 1
+        # print 'days_at_a_time = ', days_at_a_time
+        t_incr = 86400 * days_at_a_time
+
+        tile_service = NexusTileService()
+
+        # Compute the intermediate summations needed for the Pearson 
+        # Correlation Coefficient.  We use a one-pass online algorithm
+        # so that not all of the data needs to be kept in memory all at once.
+        t_start = start_time
+        while t_start <= end_time:
+            t_end = min(t_start + t_incr, end_time)
+            # t1 = time()
+            # print 'nexus call start at time %f' % t1
+            # sys.stdout.flush()
+            ds1tiles = tile_service.get_tiles_bounded_by_box(min_lat,
+                                                             max_lat,
+                                                             min_lon,
+                                                             max_lon,
+                                                             ds[0],
+                                                             t_start,
+                                                             t_end)
+            ds2tiles = tile_service.get_tiles_bounded_by_box(min_lat,
+                                                             max_lat,
+                                                             min_lon,
+                                                             max_lon,
+                                                             ds[1],
+                                                             t_start,
+                                                             t_end)
+            # t2 = time()
+            # print 'nexus call end at time %f' % t2
+            # print 'secs in nexus call: ', t2-t1
+            # sys.stdout.flush()
+
+            len1 = len(ds1tiles)
+            len2 = len(ds2tiles)
+            # print 't %d to %d - Got %d and %d tiles' % (t_start, t_end,
+            #                                            len1, len2)
+            # sys.stdout.flush()
+            i1 = 0
+            i2 = 0
+            time1 = 0
+            time2 = 0
+            while i1 < len1 and i2 < len2:
+                tile1 = ds1tiles[i1]
+                tile2 = ds2tiles[i2]
+                # print 'tile1.data = ',tile1.data
+                # print 'tile2.data = ',tile2.data
+                # print 'i1, i2, t1, t2 times: ', i1, i2, tile1.times[0], tile2.times[0]
+                assert tile1.times[0] >= time1, 'DS1 time out of order!'
+                assert tile2.times[0] >= time2, 'DS2 time out of order!'
+                time1 = tile1.times[0]
+                time2 = tile2.times[0]
+                # print 'i1=%d,i2=%d,time1=%d,time2=%d'%(i1,i2,time1,time2)
+                if time1 < time2:
+                    i1 += 1
+                    continue
+                elif time2 < time1:
+                    i2 += 1
+                    continue
+                assert (time1 == time2), \
+                    "Mismatched tile times %d and %d" % (time1, time2)
+                # print 'processing time:',time1,time2
+                t1_data = tile1.data.data
+                t1_mask = tile1.data.mask
+                t2_data = tile2.data.data
+                t2_mask = tile2.data.mask
+                t1_data = np.nan_to_num(t1_data)
+                t2_data = np.nan_to_num(t2_data)
+                joint_mask = ((~t1_mask).astype(np.uint8) *
+                              (~t2_mask).astype(np.uint8))
+                # print 'joint_mask=',joint_mask
+                sumx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                              joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+                # print 'sumx_tile=',sumx_tile
+                sumy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                              joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+                # print 'sumy_tile=',sumy_tile
+                sumxx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                               t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                               joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+                # print 'sumxx_tile=',sumxx_tile
+                sumyy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                               t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                               joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+                # print 'sumyy_tile=',sumyy_tile
+                sumxy_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                               t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+                               joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+                # print 'sumxy_tile=',sumxy_tile
+                n_tile += joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]
+                # print 'n_tile=',n_tile
+                i1 += 1
+                i2 += 1
+            t_start = t_end + 1
+
+        # print 'Finished tile', tile_bounds
+        # sys.stdout.flush()
+        return ((min_lat, max_lat, min_lon, max_lon), (sumx_tile, sumy_tile,
+                                                       sumxx_tile, sumyy_tile,
+                                                       sumxy_tile, n_tile))
+
+    def calc(self, computeOptions, **args):
+
+        spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
+        self._setQueryParams(computeOptions.get_dataset(),
+                             (float(computeOptions.get_min_lat()),
+                              float(computeOptions.get_max_lat()),
+                              float(computeOptions.get_min_lon()),
+                              float(computeOptions.get_max_lon())),
+                             computeOptions.get_start_time(),
+                             computeOptions.get_end_time(),
+                             spark_master=spark_master,
+                             spark_nexecs=spark_nexecs,
+                             spark_nparts=spark_nparts)
+
+        self.log.debug('ds = {0}'.format(self._ds))
+        if not len(self._ds) == 2:
+            raise NexusProcessingException(
+                reason="Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2",
+                code=400)
+        if next(iter([clim for clim in self._ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(reason="Cannot compute correlation on a climatology", code=400)
+
+        nexus_tiles = self._find_global_tile_set()
+        # print 'tiles:'
+        # for tile in nexus_tiles:
+        #     print tile.granule
+        #     print tile.section_spec
+        #     print 'lat:', tile.latitudes
+        #     print 'lon:', tile.longitudes
+
+        #                                                          nexus_tiles)
+        if len(nexus_tiles) == 0:
+            raise NoDataException(reason="No data found for selected timeframe")
+
+        self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+        self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
+        nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1
+        nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1
+        self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons))
+
+        # Create array of tuples to pass to Spark map function
+        nexus_tiles_spark = [[self._find_tile_bounds(t),
+                              self._startTime, self._endTime,
+                              self._ds] for t in nexus_tiles]
+
+        # Remove empty tiles (should have bounds set to None)
+        bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+        for i in np.flipud(bad_tile_inds):
+            del nexus_tiles_spark[i]
+
+        # Expand Spark map tuple array by duplicating each entry N times,
+        # where N is the number of ways we want the time dimension carved up.
+        num_time_parts = 72
+        # num_time_parts = 2
+        # num_time_parts = 1
+        nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
+        self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
+
+        # Set the time boundaries for each of the Spark map tuples.
+        # Every Nth element in the array gets the same time bounds.
+        spark_part_times = np.linspace(self._startTime, self._endTime + 1,
+                                       num_time_parts + 1, dtype=np.int64)
+
+        spark_part_time_ranges = \
+            np.repeat([[[spark_part_times[i],
+                         spark_part_times[i + 1] - 1] for i in range(num_time_parts)]],
+                      len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2))
+        self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
+        nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+        # print 'nexus_tiles_spark final = '
+        # for i in range(len(nexus_tiles_spark)):
+        #    print nexus_tiles_spark[i]
+
+        # Launch Spark computations
+        # print 'nexus_tiles_spark=',nexus_tiles_spark
+        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        sum_tiles_part = rdd.map(self._map)
+        # print "sum_tiles_part = ",sum_tiles_part.collect()
+        sum_tiles = \
+            sum_tiles_part.combineByKey(lambda val: val,
+                                        lambda x, val: (x[0] + val[0],
+                                                        x[1] + val[1],
+                                                        x[2] + val[2],
+                                                        x[3] + val[3],
+                                                        x[4] + val[4],
+                                                        x[5] + val[5]),
+                                        lambda x, y: (x[0] + y[0],
+                                                      x[1] + y[1],
+                                                      x[2] + y[2],
+                                                      x[3] + y[3],
+                                                      x[4] + y[4],
+                                                      x[5] + y[5]))
+        # Convert the N (pixel-wise count) array for each tile to be a 
+        # NumPy masked array.  That is the last array in the tuple of 
+        # intermediate summation arrays.  Set mask to True if count is 0.
+        sum_tiles = \
+            sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx,
+            sum_yy, sum_xy, n)):
+                          (bounds, (sum_x, sum_y, sum_xx, sum_yy, sum_xy,
+                                    np.ma.array(n,
+                                                mask=~(n.astype(bool))))))
+
+        # print 'sum_tiles = ',sum_tiles.collect()
+
+        # For each pixel in each tile compute an array of Pearson 
+        # correlation coefficients.  The map function is called once 
+        # per tile.  The result of this map operation is a list of 3-tuples of
+        # (bounds, r, n) for each tile (r=Pearson correlation coefficient
+        # and n=number of input values that went into each pixel with 
+        # any masked values not included).
+        corr_tiles = \
+            sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx, sum_yy,
+            sum_xy, n)):
+                          (bounds,
+                           np.ma.array(((sum_xy - sum_x * sum_y / n) /
+                                        np.sqrt((sum_xx - sum_x * sum_x / n) *
+                                                (sum_yy - sum_y * sum_y / n))),
+                                       mask=~(n.astype(bool))),
+                           n)).collect()
+
+        r = np.zeros((nlats, nlons), dtype=np.float64, order='C')
+        n = np.zeros((nlats, nlons), dtype=np.uint32, order='C')
+
+        # The tiles below are NOT Nexus objects.  They are tuples
+        # with the following for each correlation map subset:
+        # (1) lat-lon bounding box, (2) array of correlation r values, 
+        # and (3) array of count n values.
+        for tile in corr_tiles:
+            ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+             tile_data, tile_cnt) = tile
+            y0 = self._lat2ind(tile_min_lat)
+            y1 = self._lat2ind(tile_max_lat)
+            x0 = self._lon2ind(tile_min_lon)
+            x1 = self._lon2ind(tile_max_lon)
+            self.log.debug(
+                'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, tile_max_lat,
+                                                                                             tile_min_lon, tile_max_lon,
+                                                                                             y0, y1, x0, x1))
+            r[y0:y1 + 1, x0:x1 + 1] = tile_data
+            n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+
+        # Store global map in a NetCDF file.
+        self._create_nc_file(r, 'corrmap.nc', 'r')
+
+        # Create dict for JSON response
+        results = [[{'r': r[y, x], 'cnt': int(n[y, x]),
+                     'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+                    for x in range(r.shape[1])] for y in range(r.shape[0])]
+
+        return CorrelationResults(results)
+
+
+class CorrelationResults(NexusResults):
+    def __init__(self, results):
+        NexusResults.__init__(self)
+        self.results = results
+
+    def toJson(self):
+        json_d = {
+            "stats": {},
+            "meta": [None, None],
+            "data": self.results
+        }
+        return json.dumps(json_d, indent=4)