You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:24 UTC
[47/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all
code under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/fetchedgeimpl.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
new file mode 100644
index 0000000..d4a6c4a
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
@@ -0,0 +1,201 @@
+import json
+import traceback
+from datetime import datetime
+from multiprocessing.pool import ThreadPool
+
+import requests
+
+import geo
+import values
+from webservice.webmodel import NexusProcessingException
+
+
+def __parseDatetime(dtString):
+ dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+ epoch = datetime.utcfromtimestamp(0)
+ time = (dt - epoch).total_seconds() * 1000.0
+ return time
+
+
+def __parseLocation(locString):
+ if "Point" in locString:
+ locString = locString[6:-1]
+
+ if "," in locString:
+ latitude = float(locString.split(",")[0])
+ longitude = float(locString.split(",")[1])
+ else:
+ latitude = float(locString.split(" ")[1])
+ longitude = float(locString.split(" ")[0])
+
+ return (latitude, longitude)
+
+
+def __resultRawToUsable(resultdict):
+ resultdict["time"] = __parseDatetime(resultdict["time"])
+ latitude, longitude = __parseLocation(resultdict["point"])
+
+ resultdict["x"] = longitude
+ resultdict["y"] = latitude
+
+ if "id" not in resultdict and "metadata" in resultdict:
+ resultdict["id"] = resultdict["metadata"]
+
+ resultdict["id"] = "id-%s" % resultdict["id"]
+
+ if "device" in resultdict:
+ resultdict["device"] = values.getDeviceById(resultdict["device"])
+
+ if "platform" in resultdict:
+ resultdict["platform"] = values.getPlatformById(resultdict["platform"])
+
+ if "mission" in resultdict:
+ resultdict["mission"] = values.getMissionById(resultdict["mission"])
+
+ if "sea_surface_temperature" in resultdict:
+ resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"]
+ del resultdict["sea_surface_temperature"]
+
+ return resultdict
+
+
+def __fetchJson(url, params, trycount=1, maxtries=5):
+ if trycount > maxtries:
+ raise Exception("Maximum retries attempted.")
+ if trycount > 1:
+ print "Retry #", trycount
+ r = requests.get(url, params=params, timeout=500.000)
+
+ print r.url
+
+ if r.status_code != 200:
+ return __fetchJson(url, params, trycount + 1, maxtries)
+ try:
+ results = json.loads(r.text)
+ return results
+ except:
+ return __fetchJson(url, params, trycount + 1, maxtries)
+
+
+def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0, platforms=None,
+ pageCallback=None):
+ params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage,
+ "startIndex": startIndex, "stats": "true"}
+
+ if depth_min is not None:
+ params['minDepth'] = depth_min
+ if depth_max is not None:
+ params['maxDepth'] = depth_max
+
+ if platforms is not None:
+ params["platform"] = platforms.split(",")
+
+ resultsRaw = __fetchJson(endpoint["url"], params)
+ boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
+
+ if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check
+ return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer
+
+ try:
+ results = []
+ for resultdict in resultsRaw["results"]:
+ result = __resultRawToUsable(resultdict)
+ result["source"] = endpoint["name"]
+ boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"])
+ results.append(result)
+
+ if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0:
+ stats = resultsRaw["stats_fields"]
+ if "lat" in stats and "lon" in stats:
+ boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'],
+ west=stats['lon']['min'], east=stats['lon']['max'])
+
+ if pageCallback is not None:
+ pageCallback(results)
+
+ '''
+ If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
+ '''
+ if pageCallback is None:
+ return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
+ resultsRaw["itemsPerPage"]), boundsConstrainer
+ else:
+ return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
+ resultsRaw["itemsPerPage"]), boundsConstrainer
+ except:
+ print "Invalid or missing JSON in response."
+ traceback.print_exc()
+ raise NexusProcessingException(reason="Invalid or missing JSON in response.")
+ # return [], 0, startIndex, itemsPerPage, boundsConstrainer
+
+
+def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ startIndex = 0
+ pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
+ endTime, bbox,
+ depth_min, depth_max, 0,
+ startIndex, platforms)
+ return totalResults, boundsConstrainer
+
+
+def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None):
+ results = []
+ startIndex = 0
+
+ mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
+
+ # First isn't parellel so we can get the ttl results, forced items per page, etc...
+ pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
+ endTime, bbox,
+ depth_min, depth_max,
+ endpoint["itemsPerPage"],
+ startIndex, platforms,
+ pageCallback)
+ results = results + pageResults
+ mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer)
+
+ pool = ThreadPool(processes=endpoint["fetchThreads"])
+ mpResults = [pool.apply_async(__doQuery, args=(
+ endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in
+ range(len(pageResults), totalResults, itemsPerPageR)]
+ pool.close()
+ pool.join()
+
+ '''
+ If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
+ '''
+ if pageCallback is None:
+ mpResults = [p.get() for p in mpResults]
+ for mpResult in mpResults:
+ results = results + mpResult[0]
+ mainBoundsConstrainer.testOtherConstrainer(mpResult[4])
+
+ return results, mainBoundsConstrainer
+
+
+def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
+ results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+ if placeholders:
+ trimmedResults = []
+ for item in results:
+ depth = None
+ if "depth" in item:
+ depth = item["depth"]
+ if "sea_water_temperature_depth" in item:
+ depth = item["sea_water_temperature_depth"]
+
+ trimmedItem = {
+ "x": item["x"],
+ "y": item["y"],
+ "source": item["source"],
+ "time": item["time"],
+ "device": item["device"] if "device" in item else None,
+ "platform": item["platform"],
+ "depth": depth
+ }
+ trimmedResults.append(trimmedItem)
+
+ results = trimmedResults
+
+ return results, boundsConstrainer
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/geo.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py
new file mode 100644
index 0000000..ece1788
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/geo.py
@@ -0,0 +1,113 @@
+import math
+
+
+MEAN_RADIUS_EARTH_METERS = 6371010.0
+EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0
+POLAR_RADIUS_EARTH_METERS = 6356752.0
+FLATTENING_EARTH = 298.257223563
+MEAN_RADIUS_EARTH_MILES = 3958.8
+
+
+class DistanceUnit(object):
+ METERS = 0
+ MILES = 1
+
+
+# Haversine implementation for great-circle distances between two points
+def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS):
+ if units == DistanceUnit.METERS:
+ R = MEAN_RADIUS_EARTH_METERS
+ elif units == DistanceUnit.MILES:
+ R = MEAN_RADIUS_EARTH_MILES
+ else:
+ raise Exception("Invalid units specified")
+ x0r = x0 * (math.pi / 180.0) # To radians
+ x1r = x1 * (math.pi / 180.0) # To radians
+ xd = (x1 - x0) * (math.pi / 180.0)
+ yd = (y1 - y0) * (math.pi / 180.0)
+
+ a = math.sin(xd/2.0) * math.sin(xd/2.0) + \
+ math.cos(x0r) * math.cos(x1r) * \
+ math.sin(yd / 2.0) * math.sin(yd / 2.0)
+ c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
+ d = R * c
+ return d
+
+
+# Equirectangular approximation for when performance is key. Better at smaller distances
+def equirectangularApprox(x0, y0, x1, y1):
+ R = 6371000.0 # Meters
+ x0r = x0 * (math.pi / 180.0) # To radians
+ x1r = x1 * (math.pi / 180.0)
+ y0r = y0 * (math.pi / 180.0)
+ y1r = y1 * (math.pi / 180.0)
+
+ x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0)
+ y = x1r - x0r
+ d = math.sqrt(x*x + y*y) * R
+ return d
+
+
+
+class BoundingBox(object):
+
+ def __init__(self, north=None, south=None, west=None, east=None, asString=None):
+ if asString is not None:
+ bboxParts = asString.split(",")
+ self.west=float(bboxParts[0])
+ self.south=float(bboxParts[1])
+ self.east=float(bboxParts[2])
+ self.north=float(bboxParts[3])
+ else:
+ self.north = north
+ self.south = south
+ self.west = west
+ self.east = east
+
+ def toString(self):
+ return "%s,%s,%s,%s"%(self.west, self.south, self.east, self.north)
+
+ def toMap(self):
+ return {
+ "xmin": self.west,
+ "xmax": self.east,
+ "ymin": self.south,
+ "ymax": self.north
+ }
+
+'''
+ Constrains, does not expand.
+'''
+class BoundsConstrainer(BoundingBox):
+
+ def __init__(self, north=None, south=None, west=None, east=None, asString=None):
+ BoundingBox.__init__(self, north, south, west, east, asString)
+
+ def testNorth(self, v):
+ if v is None:
+ return
+ self.north = max([self.north, v])
+
+ def testSouth(self, v):
+ if v is None:
+ return
+ self.south = min([self.south, v])
+
+ def testEast(self, v):
+ if v is None:
+ return
+ self.east = max([self.east, v])
+
+ def testWest(self, v):
+ if v is None:
+ return
+ self.west = min([self.west, v])
+
+ def testCoords(self, north=None, south=None, west=None, east=None):
+ self.testNorth(north)
+ self.testSouth(south)
+ self.testWest(west)
+ self.testEast(east)
+
+ def testOtherConstrainer(self, other):
+ self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/histogramplot.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py
new file mode 100644
index 0000000..a413769
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/histogramplot.py
@@ -0,0 +1,117 @@
+
+import BaseDomsHandler
+import ResultsStorage
+import string
+from cStringIO import StringIO
+import matplotlib.mlab as mlab
+
+from multiprocessing import Process, Manager
+
+import numpy as np
+import matplotlib.pyplot as plt
+import matplotlib
+matplotlib.use('Agg')
+
+
+
+
+PARAMETER_TO_FIELD = {
+ "sst": "sea_water_temperature",
+ "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+ "sst": "($^\circ$C)",
+ "sss": "(g/L)"
+}
+
+class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+
+ def __init__(self, x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None):
+ BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId)
+ self.__primary = primary
+ self.__secondary = secondary
+ self.__x = x
+ self.__parameter = parameter
+ self.__plot = plot
+
+ def toImage(self):
+ return self.__plot
+
+
+def render(d, x, primary, secondary, parameter, norm_and_curve=False):
+
+ fig, ax = plt.subplots()
+ fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold')
+
+
+
+ n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75)
+
+
+ if norm_and_curve:
+ mean = np.mean(x)
+ variance = np.var(x)
+ sigma = np.sqrt(variance)
+ y = mlab.normpdf(bins, mean, sigma)
+ l = plt.plot(bins, y, 'r--', linewidth=1)
+
+ ax.set_title('n = %d' % len(x))
+
+ units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
+ ax.set_xlabel("%s - %s %s" % (primary, secondary, units))
+
+ if norm_and_curve:
+ ax.set_ylabel("Probability per unit difference")
+ else:
+ ax.set_ylabel("Frequency")
+
+ plt.grid(True)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ d['plot'] = sio.getvalue()
+
+
+def renderAsync(x, primary, secondary, parameter, norm_and_curve):
+ manager = Manager()
+ d = manager.dict()
+ p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve))
+ p.start()
+ p.join()
+ return d['plot']
+
+
+def createHistogramPlot(id, parameter, norm_and_curve=False):
+
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(id)
+
+ primary = params["primary"]
+ secondary = params["matchup"][0]
+
+ x = createHistTable(data, secondary, parameter)
+
+ plot = renderAsync(x, primary, secondary, parameter, norm_and_curve)
+
+ r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary,
+ args=params, details=stats,
+ bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+ return r
+
+
+def createHistTable(results, secondary, parameter):
+
+ x = []
+
+ field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+ for entry in results:
+ for match in entry["matches"]:
+ if match["source"] == secondary:
+ if field in entry and field in match:
+ a = entry[field]
+ b = match[field]
+ x.append((a - b))
+
+ return x
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/insitusubset.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py
new file mode 100644
index 0000000..c688530
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/insitusubset.py
@@ -0,0 +1,248 @@
+import StringIO
+import csv
+import json
+import logging
+from datetime import datetime
+
+import requests
+
+import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.webmodel import NexusProcessingException, NoDataException
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler):
+ name = "DOMS In Situ Subsetter"
+ path = "/domsinsitusubset"
+ description = "Subset a DOMS in situ source given the search domain."
+
+ params = [
+ {
+ "name": "source",
+ "type": "comma-delimited string",
+ "description": "The in situ Dataset to be sub-setted",
+ "required": "true",
+ "sample": "spurs"
+ },
+ {
+ "name": "parameter",
+ "type": "string",
+ "description": "The parameter of interest. One of 'sst', 'sss', 'wind'",
+ "required": "false",
+ "default": "All",
+ "sample": "sss"
+ },
+ {
+ "name": "startTime",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
+ "required": "true",
+ "sample": "2013-10-21T00:00:00Z"
+ },
+ {
+ "name": "endTime",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
+ "required": "true",
+ "sample": "2013-10-31T23:59:59Z"
+ },
+ {
+ "name": "b",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ "required": "true",
+ "sample": "-30,15,-45,30"
+ },
+ {
+ "name": "depthMin",
+ "type": "float",
+ "description": "Minimum depth of measurements. Must be less than depthMax",
+ "required": "false",
+ "default": "No limit",
+ "sample": "0"
+ },
+ {
+ "name": "depthMax",
+ "type": "float",
+ "description": "Maximum depth of measurements. Must be greater than depthMin",
+ "required": "false",
+ "default": "No limit",
+ "sample": "5"
+ },
+ {
+ "name": "platforms",
+ "type": "comma-delimited integer",
+ "description": "Platforms to include for subset consideration",
+ "required": "false",
+ "default": "All",
+ "sample": "1,2,3,4,5,6,7,8,9"
+ },
+ {
+ "name": "output",
+ "type": "string",
+ "description": "Output type. Only 'CSV' or 'JSON' is currently supported",
+ "required": "false",
+ "default": "JSON",
+ "sample": "CSV"
+ }
+ ]
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ source_name = request.get_argument('source', None)
+ if source_name is None or source_name.strip() == '':
+ raise NexusProcessingException(reason="'source' argument is required", code=400)
+
+ parameter_s = request.get_argument('parameter', None)
+ if parameter_s not in ['sst', 'sss', 'wind', None]:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ depth_min = request.get_decimal_arg('depthMin', default=None)
+ depth_max = request.get_decimal_arg('depthMax', default=None)
+
+ if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+ raise NexusProcessingException(
+ reason="Depth Min should be less than Depth Max", code=400)
+
+ platforms = request.get_argument('platforms', None)
+ if platforms is not None:
+ try:
+ p_validation = platforms.split(',')
+ p_validation = [int(p) for p in p_validation]
+ del p_validation
+ except:
+ raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+ return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms
+
+ def calc(self, request, **args):
+
+ source_name, parameter_s, start_time, end_time, bounding_polygon, \
+ depth_min, depth_max, platforms = self.parse_arguments(request)
+
+ with requests.session() as edge_session:
+ edge_results = query_edge(source_name, parameter_s, start_time, end_time,
+ ','.join([str(bound) for bound in bounding_polygon.bounds]),
+ platforms, depth_min, depth_max, edge_session)['results']
+
+ if len(edge_results) == 0:
+ raise NoDataException
+ return InSituSubsetResult(results=edge_results)
+
+
+class InSituSubsetResult(object):
+ def __init__(self, results):
+ self.results = results
+
+ def toJson(self):
+ return json.dumps(self.results, indent=4)
+
+ def toCSV(self):
+ fieldnames = sorted(next(iter(self.results)).keys())
+
+ csv_mem_file = StringIO.StringIO()
+ try:
+ writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames)
+
+ writer.writeheader()
+ writer.writerows(self.results)
+ csv_out = csv_mem_file.getvalue()
+ finally:
+ csv_mem_file.close()
+
+ return csv_out
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000,
+ startIndex=0, stats=True):
+ log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge')
+ try:
+ startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ platform = platform.split(',')
+ except AttributeError:
+ # Assume we were passed a list
+ pass
+
+ params = {"startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "minDepth": depth_min,
+ "maxDepth": depth_max,
+ "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+ if variable:
+ params['variable'] = variable
+ if platform:
+ params['platform'] = platform
+
+ edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+ edge_request.raise_for_status()
+ edge_response = json.loads(edge_request.text)
+
+ # Get all edge results
+ next_page_url = edge_response.get('next', None)
+ while next_page_url is not None:
+ log.debug("requesting %s" % next_page_url)
+ edge_page_request = session.get(next_page_url)
+
+ edge_page_request.raise_for_status()
+ edge_page_response = json.loads(edge_page_request.text)
+
+ edge_response['results'].extend(edge_page_response['results'])
+
+ next_page_url = edge_page_response.get('next', None)
+
+ return edge_response
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/mapplot.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py
new file mode 100644
index 0000000..47870a7
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/mapplot.py
@@ -0,0 +1,171 @@
+
+from mpl_toolkits.basemap import Basemap
+
+import BaseDomsHandler
+import ResultsStorage
+import numpy as np
+import string
+from cStringIO import StringIO
+
+from multiprocessing import Process, Manager
+
+import matplotlib.pyplot as plt
+import matplotlib
+matplotlib.use('Agg')
+
+
+PARAMETER_TO_FIELD = {
+ "sst": "sea_water_temperature",
+ "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+ "sst": "($^\circ$ C)",
+ "sss": "(g/L)"
+}
+
+
+def __square(minLon, maxLon, minLat, maxLat):
+ if maxLat - minLat > maxLon - minLon:
+ a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0
+ minLon -= a
+ maxLon += a
+ elif maxLon - minLon > maxLat - minLat:
+ a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0
+ minLat -= a
+ maxLat += a
+
+ return minLon, maxLon, minLat, maxLat
+
+
+def render(d, lats, lons, z, primary, secondary, parameter):
+ fig = plt.figure()
+ ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
+
+
+ ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
+ # ax.set_ylabel('Latitude')
+ # ax.set_xlabel('Longitude')
+
+ minLatA = np.min(lats)
+ maxLatA = np.max(lats)
+ minLonA = np.min(lons)
+ maxLonA = np.max(lons)
+
+ minLat = minLatA - (abs(maxLatA - minLatA) * 0.1)
+ maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1)
+
+ minLon = minLonA - (abs(maxLonA - minLonA) * 0.1)
+ maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1)
+
+ minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat)
+
+
+ # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l')
+ m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat,
+ resolution='l')
+
+
+ m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10)
+ m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10)
+
+ m.drawcoastlines()
+ m.drawmapboundary(fill_color='#99ffff')
+ m.fillcontinents(color='#cc9966', lake_color='#99ffff')
+
+ #lats, lons = np.meshgrid(lats, lons)
+
+ masked_array = np.ma.array(z, mask=np.isnan(z))
+ z = masked_array
+
+
+ values = np.zeros(len(z))
+ for i in range(0, len(z)):
+ values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10
+
+ x, y = m(lons, lats)
+
+ im1 = m.scatter(x, y, values)
+
+
+ im1.set_array(z)
+ cb = m.colorbar(im1)
+
+ units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
+ cb.set_label("Difference %s" % units)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ plot = sio.getvalue()
+ if d is not None:
+ d['plot'] = plot
+ return plot
+
+
+
+class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+ def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None):
+ BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId)
+ self.__lats = lats
+ self.__lons = lons
+ self.__z = np.array(z)
+ self.__parameter = parameter
+ self.__primary = primary
+ self.__secondary = secondary
+ self.__plot = plot
+
+
+
+ def toImage(self):
+ return self.__plot
+
+
+
+
+
+def renderAsync(x, y, z, primary, secondary, parameter):
+ manager = Manager()
+ d = manager.dict()
+ p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
+ p.start()
+ p.join()
+ return d['plot']
+
+
+
+def createMapPlot(id, parameter):
+
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(id)
+
+ primary = params["primary"]
+ secondary = params["matchup"][0]
+
+ lats = []
+ lons = []
+ z = []
+
+ field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+ for entry in data:
+ for match in entry["matches"]:
+ if match["source"] == secondary:
+
+ if field in entry and field in match:
+ a = entry[field]
+ b = match[field]
+ z.append((a - b))
+ z.append((a - b))
+ else:
+ z.append(1.0)
+ z.append(1.0)
+ lats.append(entry["y"])
+ lons.append(entry["x"])
+ lats.append(match["y"])
+ lons.append(match["x"])
+
+ plot = renderAsync(lats, lons, z, primary, secondary, parameter)
+ r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary,
+ args=params,
+ details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+ return r
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/scatterplot.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py
new file mode 100644
index 0000000..0cbe110
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/scatterplot.py
@@ -0,0 +1,112 @@
+
+import BaseDomsHandler
+import ResultsStorage
+import string
+from cStringIO import StringIO
+
+from multiprocessing import Process, Manager
+
+import matplotlib.pyplot as plt
+import matplotlib
+matplotlib.use('Agg')
+
+
+
+
+PARAMETER_TO_FIELD = {
+ "sst": "sea_water_temperature",
+ "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+ "sst": "($^\circ$ C)",
+ "sss": "(g/L)"
+}
+
+
+
+def render(d, x, y, z, primary, secondary, parameter):
+ fig, ax = plt.subplots()
+
+ ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
+
+ units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[
+ "sst"]
+ ax.set_ylabel("%s %s" % (secondary, units))
+ ax.set_xlabel("%s %s" % (primary, units))
+
+ ax.scatter(x, y)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ d['plot'] = sio.getvalue()
+
+class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+
+ def __init__(self, x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None):
+ BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId)
+ self.__primary = primary
+ self.__secondary = secondary
+ self.__x = x
+ self.__y = y
+ self.__z = z
+ self.__parameter = parameter
+ self.__plot = plot
+
+ def toImage(self):
+ return self.__plot
+
+
+
+
+def renderAsync(x, y, z, primary, secondary, parameter):
+ manager = Manager()
+ d = manager.dict()
+ p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
+ p.start()
+ p.join()
+ return d['plot']
+
+
+
+def createScatterPlot(id, parameter):
+
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(id)
+
+ primary = params["primary"]
+ secondary = params["matchup"][0]
+
+ x, y, z = createScatterTable(data, secondary, parameter)
+
+ plot = renderAsync(x, y, z, primary, secondary, parameter)
+
+ r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary,
+ args=params, details=stats,
+ bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+ return r
+
+
+def createScatterTable(results, secondary, parameter):
+ x = []
+ y = []
+ z = []
+
+ field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+ for entry in results:
+ for match in entry["matches"]:
+ if match["source"] == secondary:
+ if field in entry and field in match:
+ a = entry[field]
+ b = match[field]
+ x.append(a)
+ y.append(b)
+ z.append(a - b)
+
+ return x, y, z
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/subsetter.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py
new file mode 100644
index 0000000..6727a48
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -0,0 +1,245 @@
+import logging
+import os
+import tempfile
+import zipfile
+from datetime import datetime
+
+import requests
+
+import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import NexusProcessingException
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def is_blank(my_string):
+ return not (my_string and my_string.strip() != '')
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler):
+ name = "DOMS Subsetter"
+ path = "/domssubset"
+ description = "Subset DOMS sources given the search domain"
+
+ params = {
+ "dataset": {
+ "name": "NEXUS Dataset",
+ "type": "string",
+ "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required"
+ },
+ "insitu": {
+ "name": "In Situ sources",
+ "type": "comma-delimited string",
+ "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required"
+ },
+ "parameter": {
+ "name": "Data Parameter",
+ "type": "string",
+ "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "depthMin": {
+ "name": "Minimum Depth",
+ "type": "float",
+ "description": "Minimum depth of measurements. Must be less than depthMax. Optional"
+ },
+ "depthMax": {
+ "name": "Maximum Depth",
+ "type": "float",
+ "description": "Maximum depth of measurements. Must be greater than depthMin. Optional"
+ },
+ "platforms": {
+ "name": "Platforms",
+ "type": "comma-delimited integer",
+ "description": "Platforms to include for subset consideration. Optional"
+ },
+ "output": {
+ "name": "Output",
+ "type": "string",
+ "description": "Output type. Only 'ZIP' is currently supported. Required"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ primary_ds_name = request.get_argument('dataset', None)
+ matchup_ds_names = request.get_argument('insitu', None)
+
+ if is_blank(primary_ds_name) and is_blank(matchup_ds_names):
+ raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required",
+ code=400)
+
+ if matchup_ds_names is not None:
+ try:
+ matchup_ds_names = matchup_ds_names.split(',')
+ except:
+ raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400)
+
+ parameter_s = request.get_argument('parameter', None)
+ if parameter_s not in ['sst', 'sss', 'wind']:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ depth_min = request.get_decimal_arg('depthMin', default=None)
+ depth_max = request.get_decimal_arg('depthMax', default=None)
+
+ if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+ raise NexusProcessingException(
+ reason="Depth Min should be less than Depth Max", code=400)
+
+ platforms = request.get_argument('platforms', None)
+ if platforms is not None:
+ try:
+ p_validation = platforms.split(',')
+ p_validation = [int(p) for p in p_validation]
+ del p_validation
+ except:
+ raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+ return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
+ bounding_polygon, depth_min, depth_max, platforms
+
+ def calc(self, request, **args):
+
+ primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
+ bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request)
+
+ primary_url = "https://doms.jpl.nasa.gov/datainbounds"
+ primary_params = {
+ 'ds': primary_ds_name,
+ 'parameter': parameter_s,
+ 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
+ 'startTime': start_time,
+ 'endTime': end_time,
+ 'output': "CSV"
+ }
+
+ matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset"
+ matchup_params = {
+ 'source': None,
+ 'parameter': parameter_s,
+ 'startTime': start_time,
+ 'endTime': end_time,
+ 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
+ 'depthMin': depth_min,
+ 'depthMax': depth_max,
+ 'platforms': platforms,
+ 'output': 'CSV'
+ }
+
+ primary_temp_file_path = None
+ matchup_downloads = None
+
+ with requests.session() as session:
+
+ if not is_blank(primary_ds_name):
+ # Download primary
+ primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv')
+ download_file(primary_url, primary_temp_file_path, session, params=primary_params)
+
+ if len(matchup_ds_names) > 0:
+ # Download matchup
+ matchup_downloads = {}
+ for matchup_ds in matchup_ds_names:
+ matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv')
+ matchup_params['source'] = matchup_ds
+ download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params)
+
+ # Zip downloads
+ date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"),
+ datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"))
+ bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds
+ zip_dir = tempfile.mkdtemp()
+ zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds)
+ with zipfile.ZipFile(zip_path, 'w') as my_zip:
+ if primary_temp_file_path:
+ my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds))
+ if matchup_downloads:
+ for matchup_ds, download in matchup_downloads.iteritems():
+ my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds))
+
+ # Clean up
+ if primary_temp_file_path:
+ os.remove(primary_temp_file_path)
+ if matchup_downloads:
+ for matchup_ds, download in matchup_downloads.iteritems():
+ os.remove(download[1])
+
+ return SubsetResult(zip_path)
+
+
+class SubsetResult(object):
+ def __init__(self, zip_path):
+ self.zip_path = zip_path
+
+ def toJson(self):
+ raise NotImplementedError
+
+ def toZip(self):
+ with open(self.zip_path, 'rb') as zip_file:
+ zip_contents = zip_file.read()
+
+ return zip_contents
+
+ def cleanup(self):
+ os.remove(self.zip_path)
+
+
+def download_file(url, filepath, session, params=None):
+ r = session.get(url, params=params, stream=True)
+ with open(filepath, 'wb') as f:
+ for chunk in r.iter_content(chunk_size=1024):
+ if chunk: # filter out keep-alive new chunks
+ f.write(chunk)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/values.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py
new file mode 100644
index 0000000..5e98ce9
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/values.py
@@ -0,0 +1,57 @@
+PLATFORMS = [
+ {"id": 1, "desc": "ship"},
+ {"id": 2, "desc": "moored surface buoy"},
+ {"id": 3, "desc": "drifting surface float"},
+ {"id": 4, "desc": "drifting subsurface profiling float"},
+ {"id": 5, "desc": "autonomous underwater vehicle"},
+ {"id": 6, "desc": "offshore structure"},
+ {"id": 7, "desc": "coastal structure"},
+ {"id": 8, "desc": "towed unmanned submersible"},
+ {"id": 9, "desc": "orbiting satellite"}
+]
+
+DEVICES = [
+ {"id": 1, "desc": "bathythermographs"},
+ {"id": 2, "desc": "discrete water samplers"},
+ {"id": 3, "desc": "CTD"},
+ {"id": 4, "desc": "Current profilers / acousticDopplerCurrentProfiler"},
+ {"id": 5, "desc": "radiometers"},
+ {"id": 6, "desc": "scatterometers"}
+]
+
+MISSIONS = [
+ {"id": 1, "desc": "SAMOS"},
+ {"id": 2, "desc": "ICOADS"},
+ {"id": 3, "desc": "Aquarius"},
+ {"id": 4, "desc": "SPURS1"}
+]
+
+
+def getDescById(list, id):
+ for item in list:
+ if item["id"] == id:
+ return item["desc"]
+ return id
+
+
+def getPlatformById(id):
+ return getDescById(PLATFORMS, id)
+
+
+def getDeviceById(id):
+ return getDescById(DEVICES, id)
+
+
+def getMissionById(id):
+ return getDescById(MISSIONS, id)
+
+
+def getDescByListNameAndId(listName, id):
+ if listName.upper() == "PLATFORM":
+ return getPlatformById(id)
+ elif listName.upper() == "DEVICE":
+ return getDeviceById(id)
+ elif listName.upper() == "MISSION":
+ return getMissionById(id)
+ else:
+ raise Exception("Invalid list name specified ('%s')" % listName)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/workerthread.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py
new file mode 100644
index 0000000..097e604
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/workerthread.py
@@ -0,0 +1,51 @@
+import os
+import sys
+import threading
+
+class WorkerThread(threading.Thread):
+
+ def __init__(self, method, params):
+ threading.Thread.__init__(self)
+ self.method = method
+ self.params = params
+ self.completed = False
+ self.results = None
+
+
+ def run(self):
+
+
+ self.results = self.method(*self.params)
+ self.completed = True
+
+
+def __areAllComplete(threads):
+
+ for thread in threads:
+ if not thread.completed:
+ return False
+
+ return True
+
+def wait(threads, startFirst=False, poll=0.5):
+
+ if startFirst:
+ for thread in threads:
+ thread.start()
+
+ while not __areAllComplete(threads):
+ threading._sleep(poll)
+
+
+
+def foo(param1, param2):
+ print param1, param2
+ return "c"
+
+if __name__ == "__main__":
+
+ thread = WorkerThread(foo, params=("a", "b"))
+ thread.start()
+ while not thread.completed:
+ threading._sleep(0.5)
+ print thread.results
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/ClimMapSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py
new file mode 100644
index 0000000..77e3c38
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -0,0 +1,257 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import logging
+from calendar import timegm, monthrange
+from datetime import datetime
+
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+
+from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+
+@nexus_handler
+class ClimMapSparkHandlerImpl(SparkHandler):
+ name = "Climatology Map Spark"
+ path = "/climMapSpark"
+ description = "Computes a Latitude/Longitude Time Average map for a given month given an arbitrary geographical area and year range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ SparkHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+ # self.log.setLevel(logging.DEBUG)
+
+ @staticmethod
+ def _map(tile_in_spark):
+ tile_bounds = tile_in_spark[0]
+ (min_lat, max_lat, min_lon, max_lon,
+ min_y, max_y, min_x, max_x) = tile_bounds
+ startTime = tile_in_spark[1]
+ endTime = tile_in_spark[2]
+ ds = tile_in_spark[3]
+ tile_service = NexusTileService()
+ # print 'Started tile', tile_bounds
+ # sys.stdout.flush()
+ tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+ days_at_a_time = 90
+ # days_at_a_time = 30
+ # days_at_a_time = 7
+ # days_at_a_time = 1
+ # print 'days_at_a_time = ', days_at_a_time
+ t_incr = 86400 * days_at_a_time
+ sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+ cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+ t_start = startTime
+ while t_start <= endTime:
+ t_end = min(t_start + t_incr, endTime)
+ # t1 = time()
+ # print 'nexus call start at time %f' % t1
+ # sys.stdout.flush()
+ nexus_tiles = \
+ ClimMapSparkHandlerImpl.query_by_parts(tile_service,
+ min_lat, max_lat,
+ min_lon, max_lon,
+ ds,
+ t_start,
+ t_end,
+ part_dim=2)
+ # nexus_tiles = \
+ # tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+ # min_lon, max_lon,
+ # ds=ds,
+ # start_time=t_start,
+ # end_time=t_end)
+ # t2 = time()
+ # print 'nexus call end at time %f' % t2
+ # print 'secs in nexus call: ', t2-t1
+ # sys.stdout.flush()
+ # print 't %d to %d - Got %d tiles' % (t_start, t_end,
+ # len(nexus_tiles))
+ # for nt in nexus_tiles:
+ # print nt.granule
+ # print nt.section_spec
+ # print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes)
+ # print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes)
+ # sys.stdout.flush()
+
+ for tile in nexus_tiles:
+ tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+ sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+ cnt_tile += (~tile.data.mask[0,
+ min_y:max_y + 1,
+ min_x:max_x + 1]).astype(np.uint8)
+ t_start = t_end + 1
+
+ # print 'cnt_tile = ', cnt_tile
+ # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
+ # sum_tile.mask = cnt_tile.mask
+ # avg_tile = sum_tile / cnt_tile
+ # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])]
+ # print 'Finished tile', tile_bounds
+ # print 'Tile avg = ', avg_tile
+ # sys.stdout.flush()
+ return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
+
+ def _month_from_timestamp(self, t):
+ return datetime.utcfromtimestamp(t).month
+
+ def calc(self, computeOptions, **args):
+ """
+
+ :param computeOptions: StatsComputeOptions
+ :param args: dict
+ :return:
+ """
+
+ spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
+ self._setQueryParams(computeOptions.get_dataset()[0],
+ (float(computeOptions.get_min_lat()),
+ float(computeOptions.get_max_lat()),
+ float(computeOptions.get_min_lon()),
+ float(computeOptions.get_max_lon())),
+ start_year=computeOptions.get_start_year(),
+ end_year=computeOptions.get_end_year(),
+ clim_month=computeOptions.get_clim_month(),
+ spark_master=spark_master,
+ spark_nexecs=spark_nexecs,
+ spark_nparts=spark_nparts)
+ self._startTime = timegm((self._startYear, 1, 1, 0, 0, 0))
+ self._endTime = timegm((self._endYear, 12, 31, 23, 59, 59))
+
+ if 'CLIM' in self._ds:
+ raise NexusProcessingException(reason="Cannot compute Latitude/Longitude Time Average map on a climatology",
+ code=400)
+
+ nexus_tiles = self._find_global_tile_set()
+ # print 'tiles:'
+ # for tile in nexus_tiles:
+ # print tile.granule
+ # print tile.section_spec
+ # print 'lat:', tile.latitudes
+ # print 'lon:', tile.longitudes
+
+ # nexus_tiles)
+ if len(nexus_tiles) == 0:
+ raise NoDataException(reason="No data found for selected timeframe")
+
+ self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+ # for tile in nexus_tiles:
+ # print 'lats: ', tile.latitudes.compressed()
+ # print 'lons: ', tile.longitudes.compressed()
+ self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
+ nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1
+ nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1
+ self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons))
+ self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
+ self._maxLatCent))
+ self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
+ self._maxLonCent))
+
+ # Create array of tuples to pass to Spark map function
+ nexus_tiles_spark = [[self._find_tile_bounds(t),
+ self._startTime, self._endTime,
+ self._ds] for t in nexus_tiles]
+ # print 'nexus_tiles_spark = ', nexus_tiles_spark
+ # Remove empty tiles (should have bounds set to None)
+ bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+ for i in np.flipud(bad_tile_inds):
+ del nexus_tiles_spark[i]
+ num_nexus_tiles_spark = len(nexus_tiles_spark)
+ self.log.debug('Created {0} spark tiles'.format(num_nexus_tiles_spark))
+
+ # Expand Spark map tuple array by duplicating each entry N times,
+ # where N is the number of ways we want the time dimension carved up.
+ # (one partition per year in this case).
+ num_years = self._endYear - self._startYear + 1
+ nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_years, axis=0)
+ self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
+
+ # Set the time boundaries for each of the Spark map tuples.
+ # Every Nth element in the array gets the same time bounds.
+ spark_part_time_ranges = \
+ np.repeat(np.array([[timegm((y, self._climMonth, 1, 0, 0, 0)),
+ timegm((y, self._climMonth,
+ monthrange(y, self._climMonth)[1],
+ 23, 59, 59))]
+ for y in range(self._startYear,
+ self._endYear + 1)]),
+ num_nexus_tiles_spark,
+ axis=0).reshape((len(nexus_tiles_spark), 2))
+ self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
+ nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+ # print 'nexus_tiles_spark final = '
+ # for i in range(len(nexus_tiles_spark)):
+ # print nexus_tiles_spark[i]
+
+ # Launch Spark computations
+ rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+ sum_count_part = rdd.map(self._map)
+ sum_count = \
+ sum_count_part.combineByKey(lambda val: val,
+ lambda x, val: (x[0] + val[0],
+ x[1] + val[1]),
+ lambda x, y: (x[0] + y[0], x[1] + y[1]))
+ avg_tiles = \
+ sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
+ (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x])
+ if (cnt_tile[y, x] > 0) else 0.,
+ 'cnt': cnt_tile[y, x]}
+ for x in
+ range(sum_tile.shape[1])]
+ for y in
+ range(sum_tile.shape[0])])).collect()
+
+ # Combine subset results to produce global map.
+ #
+ # The tiles below are NOT Nexus objects. They are tuples
+ # with the time avg map data and lat-lon bounding box.
+ a = np.zeros((nlats, nlons), dtype=np.float64, order='C')
+ n = np.zeros((nlats, nlons), dtype=np.uint32, order='C')
+ for tile in avg_tiles:
+ if tile is not None:
+ ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+ tile_stats) = tile
+ tile_data = np.ma.array(
+ [[tile_stats[y][x]['avg'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+ tile_cnt = np.array(
+ [[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+ tile_data.mask = ~(tile_cnt.astype(bool))
+ y0 = self._lat2ind(tile_min_lat)
+ y1 = y0 + tile_data.shape[0] - 1
+ x0 = self._lon2ind(tile_min_lon)
+ x1 = x0 + tile_data.shape[1] - 1
+ if np.any(np.logical_not(tile_data.mask)):
+ self.log.debug(
+ 'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat,
+ tile_max_lat,
+ tile_min_lon,
+ tile_max_lon, y0,
+ y1, x0, x1))
+ a[y0:y1 + 1, x0:x1 + 1] = tile_data
+ n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+ else:
+ self.log.debug(
+ 'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(
+ tile_min_lat, tile_max_lat,
+ tile_min_lon, tile_max_lon,
+ y0, y1, x0, x1))
+
+ # Store global map in a NetCDF file.
+ self._create_nc_file(a, 'clmap.nc', 'val')
+
+ # Create dict for JSON response
+ results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+ 'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+ for x in range(a.shape[1])] for y in range(a.shape[0])]
+
+ return ClimMapSparkResults(results=results, meta={}, computeOptions=computeOptions)
+
+
+class ClimMapSparkResults(NexusResults):
+ def __init__(self, results=None, meta=None, computeOptions=None):
+ NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/CorrMapSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py
new file mode 100644
index 0000000..32f63e6
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -0,0 +1,316 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import json
+import logging
+
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+
+# from time import time
+from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException
+
+
+@nexus_handler
+class CorrMapSparkHandlerImpl(SparkHandler):
+ name = "Correlation Map Spark"
+ path = "/corrMapSpark"
+ description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ SparkHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+ # self.log.setLevel(logging.DEBUG)
+
+ @staticmethod
+ def _map(tile_in):
+ # Unpack input
+ tile_bounds, start_time, end_time, ds = tile_in
+ (min_lat, max_lat, min_lon, max_lon,
+ min_y, max_y, min_x, max_x) = tile_bounds
+
+ # Create arrays to hold intermediate results during
+ # correlation coefficient calculation.
+ tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+ sumx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+ sumy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+ sumxx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+ sumyy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+ sumxy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64)
+ n_tile = np.zeros(tile_inbounds_shape, dtype=np.uint32)
+
+ # Can only retrieve some number of days worth of data from Solr
+ # at a time. Set desired value here.
+ days_at_a_time = 90
+ # days_at_a_time = 30
+ # days_at_a_time = 7
+ # days_at_a_time = 1
+ # print 'days_at_a_time = ', days_at_a_time
+ t_incr = 86400 * days_at_a_time
+
+ tile_service = NexusTileService()
+
+ # Compute the intermediate summations needed for the Pearson
+ # Correlation Coefficient. We use a one-pass online algorithm
+ # so that not all of the data needs to be kept in memory all at once.
+ t_start = start_time
+ while t_start <= end_time:
+ t_end = min(t_start + t_incr, end_time)
+ # t1 = time()
+ # print 'nexus call start at time %f' % t1
+ # sys.stdout.flush()
+ ds1tiles = tile_service.get_tiles_bounded_by_box(min_lat,
+ max_lat,
+ min_lon,
+ max_lon,
+ ds[0],
+ t_start,
+ t_end)
+ ds2tiles = tile_service.get_tiles_bounded_by_box(min_lat,
+ max_lat,
+ min_lon,
+ max_lon,
+ ds[1],
+ t_start,
+ t_end)
+ # t2 = time()
+ # print 'nexus call end at time %f' % t2
+ # print 'secs in nexus call: ', t2-t1
+ # sys.stdout.flush()
+
+ len1 = len(ds1tiles)
+ len2 = len(ds2tiles)
+ # print 't %d to %d - Got %d and %d tiles' % (t_start, t_end,
+ # len1, len2)
+ # sys.stdout.flush()
+ i1 = 0
+ i2 = 0
+ time1 = 0
+ time2 = 0
+ while i1 < len1 and i2 < len2:
+ tile1 = ds1tiles[i1]
+ tile2 = ds2tiles[i2]
+ # print 'tile1.data = ',tile1.data
+ # print 'tile2.data = ',tile2.data
+ # print 'i1, i2, t1, t2 times: ', i1, i2, tile1.times[0], tile2.times[0]
+ assert tile1.times[0] >= time1, 'DS1 time out of order!'
+ assert tile2.times[0] >= time2, 'DS2 time out of order!'
+ time1 = tile1.times[0]
+ time2 = tile2.times[0]
+ # print 'i1=%d,i2=%d,time1=%d,time2=%d'%(i1,i2,time1,time2)
+ if time1 < time2:
+ i1 += 1
+ continue
+ elif time2 < time1:
+ i2 += 1
+ continue
+ assert (time1 == time2), \
+ "Mismatched tile times %d and %d" % (time1, time2)
+ # print 'processing time:',time1,time2
+ t1_data = tile1.data.data
+ t1_mask = tile1.data.mask
+ t2_data = tile2.data.data
+ t2_mask = tile2.data.mask
+ t1_data = np.nan_to_num(t1_data)
+ t2_data = np.nan_to_num(t2_data)
+ joint_mask = ((~t1_mask).astype(np.uint8) *
+ (~t2_mask).astype(np.uint8))
+ # print 'joint_mask=',joint_mask
+ sumx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+ # print 'sumx_tile=',sumx_tile
+ sumy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+ # print 'sumy_tile=',sumy_tile
+ sumxx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+ # print 'sumxx_tile=',sumxx_tile
+ sumyy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+ # print 'sumyy_tile=',sumyy_tile
+ sumxy_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ t2_data[0, min_y:max_y + 1, min_x:max_x + 1] *
+ joint_mask[0, min_y:max_y + 1, min_x:max_x + 1])
+ # print 'sumxy_tile=',sumxy_tile
+ n_tile += joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]
+ # print 'n_tile=',n_tile
+ i1 += 1
+ i2 += 1
+ t_start = t_end + 1
+
+ # print 'Finished tile', tile_bounds
+ # sys.stdout.flush()
+ return ((min_lat, max_lat, min_lon, max_lon), (sumx_tile, sumy_tile,
+ sumxx_tile, sumyy_tile,
+ sumxy_tile, n_tile))
+
+ def calc(self, computeOptions, **args):
+
+ spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
+ self._setQueryParams(computeOptions.get_dataset(),
+ (float(computeOptions.get_min_lat()),
+ float(computeOptions.get_max_lat()),
+ float(computeOptions.get_min_lon()),
+ float(computeOptions.get_max_lon())),
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time(),
+ spark_master=spark_master,
+ spark_nexecs=spark_nexecs,
+ spark_nparts=spark_nparts)
+
+ self.log.debug('ds = {0}'.format(self._ds))
+ if not len(self._ds) == 2:
+ raise NexusProcessingException(
+ reason="Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2",
+ code=400)
+ if next(iter([clim for clim in self._ds if 'CLIM' in clim]), False):
+ raise NexusProcessingException(reason="Cannot compute correlation on a climatology", code=400)
+
+ nexus_tiles = self._find_global_tile_set()
+ # print 'tiles:'
+ # for tile in nexus_tiles:
+ # print tile.granule
+ # print tile.section_spec
+ # print 'lat:', tile.latitudes
+ # print 'lon:', tile.longitudes
+
+ # nexus_tiles)
+ if len(nexus_tiles) == 0:
+ raise NoDataException(reason="No data found for selected timeframe")
+
+ self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+ self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
+ nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1
+ nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1
+ self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons))
+
+ # Create array of tuples to pass to Spark map function
+ nexus_tiles_spark = [[self._find_tile_bounds(t),
+ self._startTime, self._endTime,
+ self._ds] for t in nexus_tiles]
+
+ # Remove empty tiles (should have bounds set to None)
+ bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+ for i in np.flipud(bad_tile_inds):
+ del nexus_tiles_spark[i]
+
+ # Expand Spark map tuple array by duplicating each entry N times,
+ # where N is the number of ways we want the time dimension carved up.
+ num_time_parts = 72
+ # num_time_parts = 2
+ # num_time_parts = 1
+ nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
+ self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
+
+ # Set the time boundaries for each of the Spark map tuples.
+ # Every Nth element in the array gets the same time bounds.
+ spark_part_times = np.linspace(self._startTime, self._endTime + 1,
+ num_time_parts + 1, dtype=np.int64)
+
+ spark_part_time_ranges = \
+ np.repeat([[[spark_part_times[i],
+ spark_part_times[i + 1] - 1] for i in range(num_time_parts)]],
+ len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2))
+ self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
+ nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+ # print 'nexus_tiles_spark final = '
+ # for i in range(len(nexus_tiles_spark)):
+ # print nexus_tiles_spark[i]
+
+ # Launch Spark computations
+ # print 'nexus_tiles_spark=',nexus_tiles_spark
+ rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+ sum_tiles_part = rdd.map(self._map)
+ # print "sum_tiles_part = ",sum_tiles_part.collect()
+ sum_tiles = \
+ sum_tiles_part.combineByKey(lambda val: val,
+ lambda x, val: (x[0] + val[0],
+ x[1] + val[1],
+ x[2] + val[2],
+ x[3] + val[3],
+ x[4] + val[4],
+ x[5] + val[5]),
+ lambda x, y: (x[0] + y[0],
+ x[1] + y[1],
+ x[2] + y[2],
+ x[3] + y[3],
+ x[4] + y[4],
+ x[5] + y[5]))
+ # Convert the N (pixel-wise count) array for each tile to be a
+ # NumPy masked array. That is the last array in the tuple of
+ # intermediate summation arrays. Set mask to True if count is 0.
+ sum_tiles = \
+ sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx,
+ sum_yy, sum_xy, n)):
+ (bounds, (sum_x, sum_y, sum_xx, sum_yy, sum_xy,
+ np.ma.array(n,
+ mask=~(n.astype(bool))))))
+
+ # print 'sum_tiles = ',sum_tiles.collect()
+
+ # For each pixel in each tile compute an array of Pearson
+ # correlation coefficients. The map function is called once
+ # per tile. The result of this map operation is a list of 3-tuples of
+ # (bounds, r, n) for each tile (r=Pearson correlation coefficient
+ # and n=number of input values that went into each pixel with
+ # any masked values not included).
+ corr_tiles = \
+ sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx, sum_yy,
+ sum_xy, n)):
+ (bounds,
+ np.ma.array(((sum_xy - sum_x * sum_y / n) /
+ np.sqrt((sum_xx - sum_x * sum_x / n) *
+ (sum_yy - sum_y * sum_y / n))),
+ mask=~(n.astype(bool))),
+ n)).collect()
+
+ r = np.zeros((nlats, nlons), dtype=np.float64, order='C')
+ n = np.zeros((nlats, nlons), dtype=np.uint32, order='C')
+
+ # The tiles below are NOT Nexus objects. They are tuples
+ # with the following for each correlation map subset:
+ # (1) lat-lon bounding box, (2) array of correlation r values,
+ # and (3) array of count n values.
+ for tile in corr_tiles:
+ ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+ tile_data, tile_cnt) = tile
+ y0 = self._lat2ind(tile_min_lat)
+ y1 = self._lat2ind(tile_max_lat)
+ x0 = self._lon2ind(tile_min_lon)
+ x1 = self._lon2ind(tile_max_lon)
+ self.log.debug(
+ 'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, tile_max_lat,
+ tile_min_lon, tile_max_lon,
+ y0, y1, x0, x1))
+ r[y0:y1 + 1, x0:x1 + 1] = tile_data
+ n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+
+ # Store global map in a NetCDF file.
+ self._create_nc_file(r, 'corrmap.nc', 'r')
+
+ # Create dict for JSON response
+ results = [[{'r': r[y, x], 'cnt': int(n[y, x]),
+ 'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+ for x in range(r.shape[1])] for y in range(r.shape[0])]
+
+ return CorrelationResults(results)
+
+
+class CorrelationResults(NexusResults):
+ def __init__(self, results):
+ NexusResults.__init__(self)
+ self.results = results
+
+ def toJson(self):
+ json_d = {
+ "stats": {},
+ "meta": [None, None],
+ "data": self.results
+ }
+ return json.dumps(json_d, indent=4)