You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:25 UTC
[48/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all
code under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/BaseDomsHandler.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
new file mode 100644
index 0000000..cc4d654
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -0,0 +1,709 @@
+import StringIO
+import csv
+import json
+from datetime import datetime
+
+import numpy as np
+from decimal import Decimal
+from pytz import timezone, UTC
+
+import config
+import geo
+from webservice.NexusHandler import NexusHandler as BaseHandler
+from webservice.webmodel import NexusResults
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+try:
+ from osgeo import gdal
+ from osgeo.gdalnumeric import *
+except ImportError:
+ import gdal
+ from gdalnumeric import *
+
+from netCDF4 import Dataset
+import tempfile
+
+
+class BaseDomsQueryHandler(BaseHandler):
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def getDataSourceByName(self, source):
+ for s in config.ENDPOINTS:
+ if s["name"] == source:
+ return s
+ return None
+
+ def _does_datasource_exist(self, ds):
+ for endpoint in config.ENDPOINTS:
+ if endpoint["name"] == ds:
+ return True
+ return False
+
+
+class DomsEncoder(json.JSONEncoder):
+ def __init__(self, **args):
+ json.JSONEncoder.__init__(self, **args)
+
+ def default(self, obj):
+ # print 'MyEncoder.default() called'
+ # print type(obj)
+ if obj == np.nan:
+ return None # hard code string for now
+ elif isinstance(obj, datetime):
+ return long((obj - EPOCH).total_seconds())
+ elif isinstance(obj, Decimal):
+ return str(obj)
+ else:
+ return json.JSONEncoder.default(self, obj)
+
+
+class DomsQueryResults(NexusResults):
+ def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None,
+ executionId=None, status_code=200):
+ NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions,
+ status_code=status_code)
+ self.__args = args
+ self.__bounds = bounds
+ self.__count = count
+ self.__details = details
+ self.__executionId = str(executionId)
+
+ def toJson(self):
+ bounds = self.__bounds.toMap() if self.__bounds is not None else {}
+ return json.dumps(
+ {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds,
+ "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder)
+
+ def toCSV(self):
+ return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details)
+
+ def toNetCDF(self):
+ return DomsNetCDFFormatterAlt.create(self.__executionId, self.results(), self.__args, self.__details)
+
+
+class DomsCSVFormatter:
+ @staticmethod
+ def create(executionId, results, params, details):
+
+ csv_mem_file = StringIO.StringIO()
+ try:
+ DomsCSVFormatter.__addConstants(csv_mem_file)
+ DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details)
+ csv.writer(csv_mem_file).writerow([])
+
+ DomsCSVFormatter.__packValues(csv_mem_file, results)
+
+ csv_out = csv_mem_file.getvalue()
+ finally:
+ csv_mem_file.close()
+
+ return csv_out
+
+ @staticmethod
+ def __packValues(csv_mem_file, results):
+
+ writer = csv.writer(csv_mem_file)
+
+ headers = [
+ # Primary
+ "id", "source", "lon", "lat", "time", "platform", "sea_water_salinity_depth", "sea_water_salinity",
+ "sea_water_temperature_depth", "sea_water_temperature", "wind_speed", "wind_direction", "wind_u", "wind_v",
+ # Match
+ "id", "source", "lon", "lat", "time", "platform", "sea_water_salinity_depth", "sea_water_salinity",
+ "sea_water_temperature_depth", "sea_water_temperature", "wind_speed", "wind_direction", "wind_u", "wind_v"
+ ]
+
+ writer.writerow(headers)
+
+ for primaryValue in results:
+ for matchup in primaryValue["matches"]:
+ row = [
+ # Primary
+ primaryValue["id"], primaryValue["source"], str(primaryValue["x"]), str(primaryValue["y"]),
+ primaryValue["time"].strftime(ISO_8601), primaryValue["platform"],
+ primaryValue.get("sea_water_salinity_depth", ""), primaryValue.get("sea_water_salinity", ""),
+ primaryValue.get("sea_water_temperature_depth", ""), primaryValue.get("sea_water_temperature", ""),
+ primaryValue.get("wind_speed", ""), primaryValue.get("wind_direction", ""),
+ primaryValue.get("wind_u", ""), primaryValue.get("wind_v", ""),
+
+ # Matchup
+ matchup["id"], matchup["source"], matchup["x"], matchup["y"],
+ matchup["time"].strftime(ISO_8601), matchup["platform"],
+ matchup.get("sea_water_salinity_depth", ""), matchup.get("sea_water_salinity", ""),
+ matchup.get("sea_water_temperature_depth", ""), matchup.get("sea_water_temperature", ""),
+ matchup.get("wind_speed", ""), matchup.get("wind_direction", ""),
+ matchup.get("wind_u", ""), matchup.get("wind_v", ""),
+ ]
+
+ writer.writerow(row)
+
+ @staticmethod
+ def __addConstants(csvfile):
+
+ global_attrs = [
+ {"Global Attribute": "Conventions", "Value": "CF-1.6, ACDD-1.3"},
+ {"Global Attribute": "title", "Value": "DOMS satellite-insitu machup output file"},
+ {"Global Attribute": "history",
+ "Value": "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"},
+ {"Global Attribute": "institution", "Value": "JPL, FSU, NCAR"},
+ {"Global Attribute": "source", "Value": "doms.jpl.nasa.gov"},
+ {"Global Attribute": "standard_name_vocabulary",
+ "Value": "CF Standard Name Table v27, BODC controlled vocabulary"},
+ {"Global Attribute": "cdm_data_type", "Value": "Point/Profile, Swath/Grid"},
+ {"Global Attribute": "processing_level", "Value": "4"},
+ {"Global Attribute": "project", "Value": "Distributed Oceanographic Matchup System (DOMS)"},
+ {"Global Attribute": "keywords_vocabulary",
+ "Value": "NASA Global Change Master Directory (GCMD) Science Keywords"},
+ # TODO What should the keywords be?
+ {"Global Attribute": "keywords", "Value": ""},
+ {"Global Attribute": "creator_name", "Value": "NASA PO.DAAC"},
+ {"Global Attribute": "creator_email", "Value": "podaac@podaac.jpl.nasa.gov"},
+ {"Global Attribute": "creator_url", "Value": "https://podaac.jpl.nasa.gov/"},
+ {"Global Attribute": "publisher_name", "Value": "NASA PO.DAAC"},
+ {"Global Attribute": "publisher_email", "Value": "podaac@podaac.jpl.nasa.gov"},
+ {"Global Attribute": "publisher_url", "Value": "https://podaac.jpl.nasa.gov"},
+ {"Global Attribute": "acknowledgment", "Value": "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."},
+ ]
+
+ writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
+
+ writer.writerows(global_attrs)
+
+ @staticmethod
+ def __addDynamicAttrs(csvfile, executionId, results, params, details):
+
+ platforms = set()
+ for primaryValue in results:
+ platforms.add(primaryValue['platform'])
+ for match in primaryValue['matches']:
+ platforms.add(match['platform'])
+
+ global_attrs = [
+ {"Global Attribute": "Platform", "Value": ', '.join(platforms)},
+ {"Global Attribute": "time_coverage_start",
+ "Value": params["startTime"].strftime(ISO_8601)},
+ {"Global Attribute": "time_coverage_end",
+ "Value": params["endTime"].strftime(ISO_8601)},
+ # TODO I don't think this applies
+ # {"Global Attribute": "time_coverage_resolution", "Value": "point"},
+
+ {"Global Attribute": "geospatial_lon_min", "Value": params["bbox"].split(',')[0]},
+ {"Global Attribute": "geospatial_lat_min", "Value": params["bbox"].split(',')[1]},
+ {"Global Attribute": "geospatial_lon_max", "Value": params["bbox"].split(',')[2]},
+ {"Global Attribute": "geospatial_lat_max", "Value": params["bbox"].split(',')[3]},
+ {"Global Attribute": "geospatial_lat_resolution", "Value": "point"},
+ {"Global Attribute": "geospatial_lon_resolution", "Value": "point"},
+ {"Global Attribute": "geospatial_lat_units", "Value": "degrees_north"},
+ {"Global Attribute": "geospatial_lon_units", "Value": "degrees_east"},
+
+ {"Global Attribute": "geospatial_vertical_min", "Value": params["depthMin"]},
+ {"Global Attribute": "geospatial_vertical_max", "Value": params["depthMax"]},
+ {"Global Attribute": "geospatial_vertical_units", "Value": "m"},
+ {"Global Attribute": "geospatial_vertical_resolution", "Value": "point"},
+ {"Global Attribute": "geospatial_vertical_positive", "Value": "down"},
+
+ {"Global Attribute": "DOMS_matchID", "Value": executionId},
+ {"Global Attribute": "DOMS_TimeWindow", "Value": params["timeTolerance"] / 60 / 60},
+ {"Global Attribute": "DOMS_TimeWindow_Units", "Value": "hours"},
+ {"Global Attribute": "DOMS_depth_min", "Value": params["depthMin"]},
+ {"Global Attribute": "DOMS_depth_min_units", "Value": "m"},
+ {"Global Attribute": "DOMS_depth_max", "Value": params["depthMax"]},
+ {"Global Attribute": "DOMS_depth_max_units", "Value": "m"},
+
+ {"Global Attribute": "DOMS_platforms", "Value": params["platforms"]},
+ {"Global Attribute": "DOMS_SearchRadius", "Value": params["radiusTolerance"]},
+ {"Global Attribute": "DOMS_SearchRadius_Units", "Value": "m"},
+ {"Global Attribute": "DOMS_bounding_box", "Value": params["bbox"]},
+
+ {"Global Attribute": "DOMS_primary", "Value": params["primary"]},
+ {"Global Attribute": "DOMS_match-up", "Value": ",".join(params["matchup"])},
+ {"Global Attribute": "DOMS_ParameterPrimary", "Value": params.get("parameter", "")},
+
+ {"Global Attribute": "DOMS_time_to_complete", "Value": details["timeToComplete"]},
+ {"Global Attribute": "DOMS_time_to_complete_units", "Value": "seconds"},
+ {"Global Attribute": "DOMS_num_matchup_matched", "Value": details["numInSituMatched"]},
+ {"Global Attribute": "DOMS_num_primary_matched", "Value": details["numGriddedMatched"]},
+ {"Global Attribute": "DOMS_num_matchup_checked",
+ "Value": details["numInSituChecked"] if details["numInSituChecked"] != 0 else "N/A"},
+ {"Global Attribute": "DOMS_num_primary_checked",
+ "Value": details["numGriddedChecked"] if details["numGriddedChecked"] != 0 else "N/A"},
+
+ {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
+ {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
+ ]
+
+ writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
+
+ writer.writerows(global_attrs)
+
+
+class DomsNetCDFFormatter:
+ @staticmethod
+ def create(executionId, results, params, details):
+ t = tempfile.mkstemp(prefix="doms_", suffix=".nc")
+ tempFileName = t[1]
+
+ dataset = Dataset(tempFileName, "w", format="NETCDF4")
+
+ dataset.matchID = executionId
+ dataset.Matchup_TimeWindow = params["timeTolerance"]
+ dataset.Matchup_TimeWindow_Units = "hours"
+
+ dataset.time_coverage_start = datetime.fromtimestamp(params["startTime"] / 1000).strftime('%Y%m%d %H:%M:%S')
+ dataset.time_coverage_end = datetime.fromtimestamp(params["endTime"] / 1000).strftime('%Y%m%d %H:%M:%S')
+ dataset.depth_min = params["depthMin"]
+ dataset.depth_max = params["depthMax"]
+ dataset.platforms = params["platforms"]
+
+ dataset.Matchup_SearchRadius = params["radiusTolerance"]
+ dataset.Matchup_SearchRadius_Units = "m"
+
+ dataset.bounding_box = params["bbox"]
+ dataset.primary = params["primary"]
+ dataset.secondary = ",".join(params["matchup"])
+
+ dataset.Matchup_ParameterPrimary = params["parameter"] if "parameter" in params else ""
+
+ dataset.time_coverage_resolution = "point"
+
+ bbox = geo.BoundingBox(asString=params["bbox"])
+ dataset.geospatial_lat_max = bbox.north
+ dataset.geospatial_lat_min = bbox.south
+ dataset.geospatial_lon_max = bbox.east
+ dataset.geospatial_lon_min = bbox.west
+ dataset.geospatial_lat_resolution = "point"
+ dataset.geospatial_lon_resolution = "point"
+ dataset.geospatial_lat_units = "degrees_north"
+ dataset.geospatial_lon_units = "degrees_east"
+ dataset.geospatial_vertical_min = 0.0
+ dataset.geospatial_vertical_max = params["radiusTolerance"]
+ dataset.geospatial_vertical_units = "m"
+ dataset.geospatial_vertical_resolution = "point"
+ dataset.geospatial_vertical_positive = "down"
+
+ dataset.time_to_complete = details["timeToComplete"]
+ dataset.num_insitu_matched = details["numInSituMatched"]
+ dataset.num_gridded_checked = details["numGriddedChecked"]
+ dataset.num_gridded_matched = details["numGriddedMatched"]
+ dataset.num_insitu_checked = details["numInSituChecked"]
+
+ dataset.date_modified = datetime.now().strftime('%Y%m%d %H:%M:%S')
+ dataset.date_created = datetime.now().strftime('%Y%m%d %H:%M:%S')
+
+ DomsNetCDFFormatter.__addNetCDFConstants(dataset)
+
+ idList = []
+ primaryIdList = []
+ DomsNetCDFFormatter.__packDataIntoDimensions(idList, primaryIdList, results)
+
+ idDim = dataset.createDimension("id", size=None)
+ primaryIdDim = dataset.createDimension("primary_id", size=None)
+
+ idVar = dataset.createVariable("id", "i4", ("id",), chunksizes=(2048,))
+ primaryIdVar = dataset.createVariable("primary_id", "i4", ("primary_id",), chunksizes=(2048,))
+
+ idVar[:] = idList
+ primaryIdVar[:] = primaryIdList
+
+ DomsNetCDFFormatter.__createDimension(dataset, results, "lat", "f4", "y")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "lon", "f4", "x")
+
+ DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_temperature_depth", "f4",
+ "sea_water_temperature_depth")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_temperature", "f4", "sea_water_temperature")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_salinity_depth", "f4",
+ "sea_water_salinity_depth")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_salinity", "f4", "sea_water_salinity")
+
+ DomsNetCDFFormatter.__createDimension(dataset, results, "wind_speed", "f4", "wind_speed")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "wind_direction", "f4", "wind_direction")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "wind_u", "f4", "wind_u")
+ DomsNetCDFFormatter.__createDimension(dataset, results, "wind_v", "f4", "wind_v")
+
+ DomsNetCDFFormatter.__createDimension(dataset, results, "time", "f4", "time")
+ dataset.close()
+
+ f = open(tempFileName, "rb")
+ data = f.read()
+ f.close()
+ os.unlink(tempFileName)
+ return data
+
+ @staticmethod
+ def __packDataIntoDimensions(idVar, primaryIdVar, values, primaryValueId=None):
+
+ for value in values:
+ id = hash(value["id"])
+ idVar.append(id)
+ primaryIdVar.append(primaryValueId if primaryValueId is not None else -1)
+
+ if "matches" in value and len(value["matches"]) > 0:
+ DomsNetCDFFormatter.__packDataIntoDimensions(idVar, primaryIdVar, value["matches"], id)
+
+ @staticmethod
+ def __packDimensionList(values, field, varList):
+ for value in values:
+ if field in value:
+ varList.append(value[field])
+ else:
+ varList.append(np.nan)
+ if "matches" in value and len(value["matches"]) > 0:
+ DomsNetCDFFormatter.__packDimensionList(value["matches"], field, varList)
+
+ @staticmethod
+ def __createDimension(dataset, values, name, type, arrayField):
+ dim = dataset.createDimension(name, size=None)
+ var = dataset.createVariable(name, type, (name,), chunksizes=(2048,), fill_value=-32767.0)
+
+ varList = []
+ DomsNetCDFFormatter.__packDimensionList(values, arrayField, varList)
+
+ var[:] = varList
+
+ if name == "lon":
+ DomsNetCDFFormatter.__enrichLonVariable(var)
+ elif name == "lat":
+ DomsNetCDFFormatter.__enrichLatVariable(var)
+ elif name == "time":
+ DomsNetCDFFormatter.__enrichTimeVariable(var)
+ elif name == "sea_water_salinity":
+ DomsNetCDFFormatter.__enrichSSSVariable(var)
+ elif name == "sea_water_salinity_depth":
+ DomsNetCDFFormatter.__enrichSSSDepthVariable(var)
+ elif name == "sea_water_temperature":
+ DomsNetCDFFormatter.__enrichSSTVariable(var)
+ elif name == "sea_water_temperature_depth":
+ DomsNetCDFFormatter.__enrichSSTDepthVariable(var)
+ elif name == "wind_direction":
+ DomsNetCDFFormatter.__enrichWindDirectionVariable(var)
+ elif name == "wind_speed":
+ DomsNetCDFFormatter.__enrichWindSpeedVariable(var)
+ elif name == "wind_u":
+ DomsNetCDFFormatter.__enrichWindUVariable(var)
+ elif name == "wind_v":
+ DomsNetCDFFormatter.__enrichWindVVariable(var)
+
+ @staticmethod
+ def __enrichSSSVariable(var):
+ var.long_name = "sea surface salinity"
+ var.standard_name = "sea_surface_salinity"
+ var.units = "1e-3"
+ var.valid_min = 30
+ var.valid_max = 40
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichSSSDepthVariable(var):
+ var.long_name = "sea surface salinity_depth"
+ var.standard_name = "sea_surface_salinity_depth"
+ var.units = "m"
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichSSTVariable(var):
+ var.long_name = "sea surface temperature"
+ var.standard_name = "sea_surface_temperature"
+ var.units = "c"
+ var.valid_min = -3
+ var.valid_max = 50
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichSSTDepthVariable(var):
+ var.long_name = "sea surface temperature_depth"
+ var.standard_name = "sea_surface_temperature_depth"
+ var.units = "m"
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichWindDirectionVariable(var):
+ var.long_name = "wind direction"
+ var.standard_name = "wind_direction"
+ var.units = "degrees"
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichWindSpeedVariable(var):
+ var.long_name = "wind speed"
+ var.standard_name = "wind_speed"
+ var.units = "km/h"
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichWindUVariable(var):
+ var.long_name = "wind u"
+ var.standard_name = "wind_u"
+ var.units = ""
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichWindVVariable(var):
+ var.long_name = "wind v"
+ var.standard_name = "wind_v"
+ var.units = ""
+ var.scale_factor = 1.0
+ var.add_offset = 0.0
+ var.coordinates = "lon lat time"
+ var.grid_mapping = "crs"
+ var.comment = ""
+ var.cell_methods = ""
+ var.metadata_link = ""
+
+ @staticmethod
+ def __enrichTimeVariable(var):
+ var.long_name = "Time"
+ var.standard_name = "time"
+ var.axis = "T"
+ var.units = "seconds since 1970-01-01 00:00:00 0:00"
+ var.calendar = "standard"
+ var.comment = "Nominal time of satellite corresponding to the start of the product time interval"
+
+ @staticmethod
+ def __enrichLonVariable(var):
+ var.long_name = "Longitude"
+ var.standard_name = "longitude"
+ var.axis = "X"
+ var.units = "degrees_east"
+ var.valid_min = -180.0
+ var.valid_max = 180.0
+ var.comment = "Data longitude for in-situ, midpoint beam for satellite measurements."
+
+ @staticmethod
+ def __enrichLatVariable(var):
+ var.long_name = "Latitude"
+ var.standard_name = "latitude"
+ var.axis = "Y"
+ var.units = "degrees_north"
+ var.valid_min = -90.0
+ var.valid_max = 90.0
+ var.comment = "Data latitude for in-situ, midpoint beam for satellite measurements."
+
+ @staticmethod
+ def __addNetCDFConstants(dataset):
+ dataset.bnds = 2
+ dataset.Conventions = "CF-1.6, ACDD-1.3"
+ dataset.title = "DOMS satellite-insitu machup output file"
+ dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"
+ dataset.institution = "JPL, FSU, NCAR"
+ dataset.source = "doms.jpl.nasa.gov"
+ dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary"
+ dataset.cdm_data_type = "Point/Profile, Swath/Grid"
+ dataset.processing_level = "4"
+ dataset.platform = "Endeavor"
+ dataset.instrument = "Endeavor on-board sea-bird SBE 9/11 CTD"
+ dataset.project = "Distributed Oceanographic Matchup System (DOMS)"
+ dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords"
+ dataset.keywords = "Salinity, Upper Ocean, SPURS, CTD, Endeavor, Atlantic Ocean"
+ dataset.creator_name = "NASA PO.DAAC"
+ dataset.creator_email = "podaac@podaac.jpl.nasa.gov"
+ dataset.creator_url = "https://podaac.jpl.nasa.gov/"
+ dataset.publisher_name = "NASA PO.DAAC"
+ dataset.publisher_email = "podaac@podaac.jpl.nasa.gov"
+ dataset.publisher_url = "https://podaac.jpl.nasa.gov"
+ dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. Grant number ####."
+
+
+class DomsNetCDFFormatterAlt:
+ @staticmethod
+ def create(executionId, results, params, details):
+ t = tempfile.mkstemp(prefix="doms_", suffix=".nc")
+ tempFileName = t[1]
+
+ dataset = Dataset(tempFileName, "w", format="NETCDF4")
+
+ dataset.matchID = executionId
+ dataset.Matchup_TimeWindow = params["timeTolerance"]
+ dataset.Matchup_TimeWindow_Units = "hours"
+
+ dataset.time_coverage_start = datetime.fromtimestamp(params["startTime"] / 1000).strftime('%Y%m%d %H:%M:%S')
+ dataset.time_coverage_end = datetime.fromtimestamp(params["endTime"] / 1000).strftime('%Y%m%d %H:%M:%S')
+ dataset.depth_min = params["depthMin"]
+ dataset.depth_max = params["depthMax"]
+ dataset.platforms = params["platforms"]
+
+ dataset.Matchup_SearchRadius = params["radiusTolerance"]
+ dataset.Matchup_SearchRadius_Units = "m"
+
+ dataset.bounding_box = params["bbox"]
+ dataset.primary = params["primary"]
+ dataset.secondary = ",".join(params["matchup"])
+
+ dataset.Matchup_ParameterPrimary = params["parameter"] if "parameter" in params else ""
+
+ dataset.time_coverage_resolution = "point"
+
+ bbox = geo.BoundingBox(asString=params["bbox"])
+ dataset.geospatial_lat_max = bbox.north
+ dataset.geospatial_lat_min = bbox.south
+ dataset.geospatial_lon_max = bbox.east
+ dataset.geospatial_lon_min = bbox.west
+ dataset.geospatial_lat_resolution = "point"
+ dataset.geospatial_lon_resolution = "point"
+ dataset.geospatial_lat_units = "degrees_north"
+ dataset.geospatial_lon_units = "degrees_east"
+ dataset.geospatial_vertical_min = 0.0
+ dataset.geospatial_vertical_max = params["radiusTolerance"]
+ dataset.geospatial_vertical_units = "m"
+ dataset.geospatial_vertical_resolution = "point"
+ dataset.geospatial_vertical_positive = "down"
+
+ dataset.time_to_complete = details["timeToComplete"]
+ dataset.num_insitu_matched = details["numInSituMatched"]
+ dataset.num_gridded_checked = details["numGriddedChecked"]
+ dataset.num_gridded_matched = details["numGriddedMatched"]
+ dataset.num_insitu_checked = details["numInSituChecked"]
+
+ dataset.date_modified = datetime.now().strftime('%Y%m%d %H:%M:%S')
+ dataset.date_created = datetime.now().strftime('%Y%m%d %H:%M:%S')
+
+ DomsNetCDFFormatterAlt.__addNetCDFConstants(dataset)
+
+ satelliteGroup = dataset.createGroup("SatelliteData")
+ satelliteWriter = DomsNetCDFValueWriter(satelliteGroup)
+
+ insituGroup = dataset.createGroup("InsituData")
+ insituWriter = DomsNetCDFValueWriter(insituGroup)
+
+ matches = DomsNetCDFFormatterAlt.__writeResults(results, satelliteWriter, insituWriter)
+
+ satelliteWriter.commit()
+ insituWriter.commit()
+
+ satDim = dataset.createDimension("satellite_ids", size=None)
+ satVar = dataset.createVariable("satellite_ids", "i4", ("satellite_ids",), chunksizes=(2048,),
+ fill_value=-32767)
+
+ satVar[:] = [f[0] for f in matches]
+
+ insituDim = dataset.createDimension("insitu_ids", size=None)
+ insituVar = dataset.createVariable("insitu_ids", "i4", ("insitu_ids",), chunksizes=(2048,),
+ fill_value=-32767)
+ insituVar[:] = [f[1] for f in matches]
+
+ dataset.close()
+
+ f = open(tempFileName, "rb")
+ data = f.read()
+ f.close()
+ os.unlink(tempFileName)
+ return data
+
+ @staticmethod
+ def __writeResults(results, satelliteWriter, insituWriter):
+ ids = {}
+ matches = []
+
+ insituIndex = 0
+
+ for r in range(0, len(results)):
+ result = results[r]
+ satelliteWriter.write(result)
+ for match in result["matches"]:
+ if match["id"] not in ids:
+ ids[match["id"]] = insituIndex
+ insituIndex += 1
+ insituWriter.write(match)
+
+ matches.append((r, ids[match["id"]]))
+
+ return matches
+
+ @staticmethod
+ def __addNetCDFConstants(dataset):
+ dataset.bnds = 2
+ dataset.Conventions = "CF-1.6, ACDD-1.3"
+ dataset.title = "DOMS satellite-insitu machup output file"
+ dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"
+ dataset.institution = "JPL, FSU, NCAR"
+ dataset.source = "doms.jpl.nasa.gov"
+ dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary"
+ dataset.cdm_data_type = "Point/Profile, Swath/Grid"
+ dataset.processing_level = "4"
+ dataset.platform = "Endeavor"
+ dataset.instrument = "Endeavor on-board sea-bird SBE 9/11 CTD"
+ dataset.project = "Distributed Oceanographic Matchup System (DOMS)"
+ dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords"
+ dataset.keywords = "Salinity, Upper Ocean, SPURS, CTD, Endeavor, Atlantic Ocean"
+ dataset.creator_name = "NASA PO.DAAC"
+ dataset.creator_email = "podaac@podaac.jpl.nasa.gov"
+ dataset.creator_url = "https://podaac.jpl.nasa.gov/"
+ dataset.publisher_name = "NASA PO.DAAC"
+ dataset.publisher_email = "podaac@podaac.jpl.nasa.gov"
+ dataset.publisher_url = "https://podaac.jpl.nasa.gov"
+ dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. Grant number ####."
+
+
+class DomsNetCDFValueWriter:
+ def __init__(self, group):
+ self.latVar = DomsNetCDFValueWriter.__createDimension(group, "lat", "f4")
+ self.lonVar = DomsNetCDFValueWriter.__createDimension(group, "lon", "f4")
+ self.sstVar = DomsNetCDFValueWriter.__createDimension(group, "sea_water_temperature", "f4")
+ self.timeVar = DomsNetCDFValueWriter.__createDimension(group, "time", "f4")
+
+ self.lat = []
+ self.lon = []
+ self.sst = []
+ self.time = []
+
+ def write(self, value):
+ self.lat.append(value["y"])
+ self.lon.append(value["x"])
+ self.time.append(value["time"])
+ self.sst.append(value["sea_water_temperature"])
+
+ def commit(self):
+ self.latVar[:] = self.lat
+ self.lonVar[:] = self.lon
+ self.sstVar[:] = self.sst
+ self.timeVar[:] = self.time
+
+ @staticmethod
+ def __createDimension(group, name, type):
+ dim = group.createDimension(name, size=None)
+ var = group.createVariable(name, type, (name,), chunksizes=(2048,), fill_value=-32767.0)
+ return var
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/DatasetListQuery.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py
new file mode 100644
index 0000000..4a08517
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/DatasetListQuery.py
@@ -0,0 +1,106 @@
+from webservice.NexusHandler import NexusHandler as BaseHandler
+from webservice.webmodel import StatsComputeOptions
+from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException, DatasetNotFoundException, cached
+import BaseDomsHandler
+import datafetch
+import config
+import requests
+import json
+import values
+import traceback
+
+@nexus_handler
+class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryHandler):
+
+ name = "DOMS Dataset Listing"
+ path = "/domslist"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+
+ def getFacetsForInsituSource(self, source):
+ url = source["url"]
+
+ params = {
+ "facet": "true",
+ "stats": "true",
+ "startIndex": 0,
+ "itemsPerPage": 0
+ }
+ try:
+ r = requests.get(url, params=params)
+ results = json.loads(r.text)
+
+ depths = None
+ if "stats_fields" in results and "depth" in results["stats_fields"]:
+ depths = results["stats_fields"]["depth"]
+
+ for facet in results["facets"]:
+ field = facet["field"]
+ for value in facet["values"]:
+ value["value"] = values.getDescByListNameAndId(field, int(value["value"]))
+
+ return depths, results["facets"]
+ except: # KMG: Don't eat the exception. Add better handling...
+ traceback.print_exc()
+ return None, None
+
+
+ def getMetadataUrlForDataset(self, dataset):
+ datasetSpec = config.getEndpointByName(dataset)
+ if datasetSpec is not None:
+ return datasetSpec["metadataUrl"]
+ else:
+
+ # KMG: NOT a good hack
+ if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM":
+ dataset = "MUR-JPL-L4-GLOB-v4.1"
+ elif dataset == "SMAP_L2B_SSS":
+ dataset = "JPL_SMAP-SSS_L2_EVAL-V2"
+ elif dataset == "AVHRR_OI_L4_GHRSST_NCEI" or dataset == "AVHRR_OI_L4_GHRSST_NCEI_CLIM":
+ dataset = "AVHRR_OI-NCEI-L4-GLOB-v2.0"
+
+ return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json"%dataset
+
+ def getMetadataForSource(self, dataset):
+ try:
+ r = requests.get(self.getMetadataUrlForDataset(dataset))
+ results = json.loads(r.text)
+ return results
+ except:
+ return None
+
+ @cached(ttl=(60 * 60 * 1000)) # 1 hour cached
+ def calc(self, computeOptions, **args):
+
+ satellitesList = self._tile_service.get_dataseries_list(simple=True)
+
+ insituList = []
+
+ for satellite in satellitesList:
+ satellite["metadata"] = self.getMetadataForSource(satellite["shortName"])
+
+
+ for insitu in config.ENDPOINTS:
+ depths, facets = self.getFacetsForInsituSource(insitu)
+ insituList.append({
+ "name" : insitu["name"],
+ "endpoint" : insitu["url"],
+ "metadata": self.getMetadataForSource(insitu["name"]),
+ "depths": depths,
+ "facets": facets
+ })
+
+
+ values = {
+ "satellite" : satellitesList,
+ "insitu" : insituList
+ }
+
+ return BaseDomsHandler.DomsQueryResults(results=values)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/DomsInitialization.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
new file mode 100644
index 0000000..bc92ebf
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -0,0 +1,133 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+
+import ConfigParser
+import logging
+import pkg_resources
+
+from cassandra import InvalidRequest
+from cassandra.cluster import Cluster
+from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
+from webservice.NexusHandler import nexus_initializer
+
+
+@nexus_initializer
+class DomsInitializer:
+ def __init__(self):
+ pass
+
+ def init(self, config):
+ log = logging.getLogger(__name__)
+ log.info("*** STARTING DOMS INITIALIZATION ***")
+
+ domsconfig = ConfigParser.RawConfigParser()
+ domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini')
+
+ cassHost = domsconfig.get("cassandra", "host")
+ cassKeyspace = domsconfig.get("cassandra", "keyspace")
+ cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
+ cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+
+ log.info("Cassandra Host(s): %s" % (cassHost))
+ log.info("Cassandra Keyspace: %s" % (cassKeyspace))
+ log.info("Cassandra Datacenter: %s" % (cassDatacenter))
+ log.info("Cassandra Protocol Version: %s" % (cassVersion))
+
+ dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+ token_policy = TokenAwarePolicy(dc_policy)
+
+ with Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
+ protocol_version=cassVersion) as cluster:
+ session = cluster.connect()
+
+ self.createKeyspace(session, cassKeyspace)
+ self.createTables(session)
+
+ def createKeyspace(self, session, cassKeyspace):
+ log = logging.getLogger(__name__)
+ log.info("Verifying DOMS keyspace '%s'" % cassKeyspace)
+ session.execute(
+ "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace)
+ session.set_keyspace(cassKeyspace)
+
+ def createTables(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying DOMS tables")
+ self.createDomsExecutionsTable(session)
+ self.createDomsParamsTable(session)
+ self.createDomsDataTable(session)
+ self.createDomsExecutionStatsTable(session)
+
+ def createDomsExecutionsTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_executions table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_executions (
+ id uuid PRIMARY KEY,
+ time_started timestamp,
+ time_completed timestamp,
+ user_email text
+ );
+ """
+ session.execute(cql)
+
+ def createDomsParamsTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_params table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_params (
+ execution_id uuid PRIMARY KEY,
+ primary_dataset text,
+ matchup_datasets text,
+ depth_tolerance decimal,
+ depth_min decimal,
+ depth_max decimal,
+ time_tolerance int,
+ radius_tolerance decimal,
+ start_time timestamp,
+ end_time timestamp,
+ platforms text,
+ bounding_box text,
+ parameter text
+ );
+ """
+ session.execute(cql)
+
+ def createDomsDataTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_data table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_data (
+ id uuid,
+ execution_id uuid,
+ value_id text,
+ primary_value_id text,
+ is_primary boolean,
+ x decimal,
+ y decimal,
+ source_dataset text,
+ measurement_time timestamp,
+ platform text,
+ device text,
+ measurement_values map<text, decimal>,
+ PRIMARY KEY (execution_id, is_primary, id)
+ );
+ """
+ session.execute(cql)
+
+ def createDomsExecutionStatsTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_execuction_stats table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_execution_stats (
+ execution_id uuid PRIMARY KEY,
+ num_gridded_matched int,
+ num_gridded_checked int,
+ num_insitu_matched int,
+ num_insitu_checked int,
+ time_to_complete int
+ );
+ """
+ session.execute(cql)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/MatchupQuery.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/MatchupQuery.py b/analysis/webservice/algorithms/doms/MatchupQuery.py
new file mode 100644
index 0000000..8fa5d8e
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/MatchupQuery.py
@@ -0,0 +1,436 @@
+import math
+import uuid
+from datetime import datetime
+
+import numpy as np
+import utm
+from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
+from scipy import spatial
+
+import BaseDomsHandler
+import ResultsStorage
+import datafetch
+import fetchedgeimpl
+import geo
+import workerthread
+from webservice.NexusHandler import nexus_handler
+
+
+@nexus_handler
+class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryHandler):
+ name = "Experimental Combined DOMS In-Situ Matchup"
+ path = "/domsmatchup"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+
+ def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms):
+
+ boundsConstrainer = geo.BoundsConstrainer(asString=bbox)
+ threads = []
+ for endpoint in endpoints:
+ thread = workerthread.WorkerThread(datafetch.fetchData,
+ params=(endpoint, startTime, endTime, bbox, depth_min, depth_max))
+ threads.append(thread)
+ workerthread.wait(threads, startFirst=True, poll=0.01)
+
+ data2 = []
+ for thread in threads:
+ data, bounds = thread.results
+ data2 += data
+ boundsConstrainer.testOtherConstrainer(bounds)
+
+ return data2, boundsConstrainer
+
+ def __parseDatetime(self, dtString):
+ dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+ epoch = datetime.utcfromtimestamp(0)
+ time = (dt - epoch).total_seconds() * 1000.0
+ return time
+
+ def calc(self, computeOptions, **args):
+ primary = computeOptions.get_argument("primary", None)
+ matchup = computeOptions.get_argument("matchup", None)
+ startTime = computeOptions.get_argument("s", None)
+ endTime = computeOptions.get_argument("e", None)
+ bbox = computeOptions.get_argument("b", None)
+ timeTolerance = computeOptions.get_float_arg("tt")
+ depth_min = computeOptions.get_float_arg("depthMin", default=None)
+ depth_max = computeOptions.get_float_arg("depthMax", default=None)
+ radiusTolerance = computeOptions.get_float_arg("rt")
+ platforms = computeOptions.get_argument("platforms", None)
+
+ if primary is None or len(primary) == 0:
+ raise Exception("No primary dataset specified")
+
+ if matchup is None or len(matchup) == 0:
+ raise Exception("No matchup datasets specified")
+
+ start = self._now()
+
+ primarySpec = self.getDataSourceByName(primary)
+ if primarySpec is None:
+ raise Exception("Specified primary dataset not found using identifier '%s'" % primary)
+
+ primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+ primaryContext = MatchupContext(primaryData)
+
+ matchupIds = matchup.split(",")
+
+ for matchupId in matchupIds:
+ matchupSpec = self.getDataSourceByName(matchupId)
+
+ if matchupSpec is not None: # Then it's in the in-situ configuration
+ proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min, depth_max,
+ platforms, timeTolerance, radiusTolerance)
+ proc.start()
+ else: # We assume it to be a Nexus tiled dataset
+
+ '''
+ Single Threaded at the moment...
+ '''
+ daysinrange = self._tile_service.find_days_in_range_asc(bounds.south, bounds.north, bounds.west,
+ bounds.east, matchupId,
+ self.__parseDatetime(startTime) / 1000,
+ self.__parseDatetime(endTime) / 1000)
+
+ tilesByDay = {}
+ for dayTimestamp in daysinrange:
+ ds1_nexus_tiles = self._tile_service.get_tiles_bounded_by_box_at_time(bounds.south, bounds.north,
+ bounds.west, bounds.east,
+ matchupId, dayTimestamp)
+
+ # print "***", type(ds1_nexus_tiles)
+ # print ds1_nexus_tiles[0].__dict__
+ tilesByDay[dayTimestamp] = ds1_nexus_tiles
+
+ primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance)
+
+ matches, numMatches = primaryContext.getFinal(len(matchupIds))
+
+ end = self._now()
+
+ args = {
+ "primary": primary,
+ "matchup": matchupIds,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "timeTolerance": timeTolerance,
+ "depthMin": depth_min,
+ "depthMax": depth_max,
+ "radiusTolerance": radiusTolerance,
+ "platforms": platforms
+ }
+
+ details = {
+ "timeToComplete": (end - start),
+ "numInSituRecords": primaryContext.insituCount,
+ "numInSituMatched": primaryContext.insituMatches,
+ "numGriddedChecked": primaryContext.griddedCount,
+ "numGriddedMatched": primaryContext.griddedMatched
+ }
+
+ with ResultsStorage.ResultsStorage() as resultsStorage:
+ execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start,
+ completeTime=end, userEmail="")
+
+ return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id)
+
+
+class MatchupContextMap:
+ def __init__(self):
+ pass
+
+ def add(self, context):
+ pass
+
+ def delete(self, context):
+ pass
+
+
+class MatchupContext:
+ def __init__(self, primaryData):
+ self.id = str(uuid.uuid4())
+
+ self.griddedCount = 0
+ self.griddedMatched = 0
+
+ self.insituCount = len(primaryData)
+ self.insituMatches = 0
+
+ self.primary = primaryData
+ for r in self.primary:
+ r["matches"] = []
+
+ self.data = []
+ for s in primaryData:
+ u = utm.from_latlon(s["y"], s["x"])
+ v = (u[0], u[1], 0.0)
+ self.data.append(v)
+
+ if len(self.data) > 0:
+ self.tree = spatial.KDTree(self.data)
+ else:
+ self.tree = None
+
+ def getFinal(self, minMatchesToInclude):
+
+ matched = []
+ ttlMatches = 0
+ for m in self.primary:
+ if len(m["matches"]) >= minMatchesToInclude:
+ matched.append(m)
+ ttlMatches += len(m["matches"])
+
+ return matched, ttlMatches
+
+ def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance):
+ for r in self.primary:
+ foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"],
+ xyTolerance)
+ self.griddedCount += 1
+ self.griddedMatched += len(foundSatNodes)
+ r["matches"].extend(foundSatNodes)
+
+ def processInSitu(self, records, xyTolerance, timeTolerance):
+ if self.tree is not None:
+ for s in records:
+ self.insituCount += 1
+ u = utm.from_latlon(s["y"], s["x"])
+ coords = np.array([u[0], u[1], 0])
+ ball = self.tree.query_ball_point(coords, xyTolerance)
+
+ self.insituMatches += len(ball)
+
+ for i in ball:
+ match = self.primary[i]
+ if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0):
+ match["matches"].append(s)
+
+ def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"):
+ value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName)
+ return value
+
+ def __checkNumber(self, value):
+ if isinstance(value, float) and (math.isnan(value) or value == np.nan):
+ value = None
+ elif value is not None:
+ value = float(value)
+ return value
+
+ def __buildSwathIndexes(self, chunk):
+ latlons = []
+ utms = []
+ indexes = []
+ for i in range(0, len(chunk.latitudes)):
+ _lat = chunk.latitudes[i]
+ if isinstance(_lat, np.ma.core.MaskedConstant):
+ continue
+ for j in range(0, len(chunk.longitudes)):
+ _lon = chunk.longitudes[j]
+ if isinstance(_lon, np.ma.core.MaskedConstant):
+ continue
+
+ value = self.__getChunkValueAtIndex(chunk, (i, j))
+ if isinstance(value, float) and (math.isnan(value) or value == np.nan):
+ continue
+
+ u = utm.from_latlon(_lat, _lon)
+ v = (u[0], u[1], 0.0)
+ latlons.append((_lat, _lon))
+ utms.append(v)
+ indexes.append((i, j))
+
+ tree = None
+ if len(latlons) > 0:
+ tree = spatial.KDTree(utms)
+
+ chunk.swathIndexing = {
+ "tree": tree,
+ "latlons": latlons,
+ "indexes": indexes
+ }
+
+ def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance):
+ foundIndexes = []
+ foundLatLons = []
+
+ if "swathIndexing" not in chunk.__dict__:
+ self.__buildSwathIndexes(chunk)
+
+ tree = chunk.swathIndexing["tree"]
+ if tree is not None:
+ indexes = chunk.swathIndexing["indexes"]
+ latlons = chunk.swathIndexing["latlons"]
+ u = utm.from_latlon(lat, lon)
+ coords = np.array([u[0], u[1], 0])
+ ball = tree.query_ball_point(coords, xyTolerance)
+ for i in ball:
+ foundIndexes.append(indexes[i])
+ foundLatLons.append(latlons[i])
+ return foundIndexes, foundLatLons
+
+ def __getChunkValueAtIndex(self, chunk, index, arrayName=None):
+
+ if arrayName is None or arrayName == "data":
+ data_val = chunk.data[0][index[0]][index[1]]
+ else:
+ data_val = chunk.meta_data[arrayName][0][index[0]][index[1]]
+ return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan')
+
+ def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance):
+ timeDiff = 86400 * 365 * 1000
+ foundNodes = []
+
+ for ts in chunksByDay:
+ chunks = chunksByDay[ts]
+ if abs((ts * 1000) - searchTime) < timeDiff:
+ for chunk in chunks:
+ indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance)
+
+ # for index in indexes:
+ for i in range(0, len(indexes)):
+ index = indexes[i]
+ latlon = latlons[i]
+ sst = None
+ sss = None
+ windSpeed = None
+ windDirection = None
+ windU = None
+ windV = None
+
+ value = self.__getChunkValueAtIndex(chunk, index)
+
+ if isinstance(value, float) and (math.isnan(value) or value == np.nan):
+ continue
+
+ if "GHRSST" in source:
+ sst = value
+ elif "ASCATB" in source:
+ windU = value
+ elif "SSS" in source: # SMAP
+ sss = value
+
+ if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
+ windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir"))
+ if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
+ windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v"))
+ if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
+ windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed"))
+
+ foundNode = {
+ "sea_water_temperature": sst,
+ "sea_water_salinity": sss,
+ "wind_speed": windSpeed,
+ "wind_direction": windDirection,
+ "wind_u": windU,
+ "wind_v": windV,
+ "time": ts,
+ "x": self.__checkNumber(latlon[1]),
+ "y": self.__checkNumber(latlon[0]),
+ "depth": 0,
+ "sea_water_temperature_depth": 0,
+ "source": source,
+ "id": "%s:%s:%s" % (ts, lat, lon)
+ }
+
+ foundNodes.append(foundNode)
+ timeDiff = abs(ts - searchTime)
+
+ return foundNodes
+
+ def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime):
+
+ timeDiff = 86400 * 365 * 1000
+ foundNodes = []
+
+ for ts in chunksByDay:
+ chunks = chunksByDay[ts]
+ # print chunks
+ # ts = calendar.timegm(chunks.start.utctimetuple()) * 1000
+ if abs((ts * 1000) - searchTime) < timeDiff:
+ value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data")
+ value = self.__checkNumber(value)
+
+ # _Really_ don't like doing it this way...
+
+ sst = None
+ sss = None
+ windSpeed = None
+ windDirection = None
+ windU = None
+ windV = None
+
+ if "GHRSST" in source:
+ sst = value
+
+ if "ASCATB" in source:
+ windU = value
+
+ if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
+ windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir"))
+ if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
+ windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v"))
+ if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
+ windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed"))
+
+ foundNode = {
+ "sea_water_temperature": sst,
+ "sea_water_salinity": sss,
+ "wind_speed": windSpeed,
+ "wind_direction": windDirection,
+ "wind_uv": {
+ "u": windU,
+ "v": windV
+ },
+ "time": ts,
+ "x": lon,
+ "y": lat,
+ "depth": 0,
+ "sea_water_temperature_depth": 0,
+ "source": source,
+ "id": "%s:%s:%s" % (ts, lat, lon)
+ }
+
+ isValidNode = True
+ if "ASCATB" in source and windSpeed is None:
+ isValidNode = None
+
+ if isValidNode:
+ foundNodes.append(foundNode)
+ timeDiff = abs(ts - searchTime)
+
+ return foundNodes
+
+
+class InsituDatasetProcessor:
+ def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance,
+ radiusTolerance):
+ self.primary = primary
+ self.datasource = datasource
+ self.startTime = startTime
+ self.endTime = endTime
+ self.bbox = bbox
+ self.depth_min = depth_min
+ self.depth_max = depth_max
+ self.platforms = platforms
+ self.timeTolerance = timeTolerance
+ self.radiusTolerance = radiusTolerance
+
+ def start(self):
+ def callback(pageData):
+ self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance)
+
+ fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max,
+ self.platforms, pageCallback=callback)
+
+
+class InsituPageProcessor:
+ def __init__(self):
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/MetadataQuery.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/MetadataQuery.py b/analysis/webservice/algorithms/doms/MetadataQuery.py
new file mode 100644
index 0000000..4161166
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/MetadataQuery.py
@@ -0,0 +1,51 @@
+from webservice.NexusHandler import NexusHandler as BaseHandler
+from webservice.webmodel import StatsComputeOptions
+from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException, DatasetNotFoundException
+import BaseDomsHandler
+import datafetch
+import config
+import requests
+import json
+
+@nexus_handler
+class DomsMetadataQueryHandler(BaseDomsHandler.BaseDomsQueryHandler):
+
+ name = "DOMS Metadata Listing"
+ path = "/domsmetadata"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+
+ dataset = computeOptions.get_argument("dataset", None)
+ if dataset is None or len(dataset) == 0:
+ raise Exception("'dataset' parameter not specified")
+
+ metadataUrl = self.__getUrlForDataset(dataset)
+
+ try:
+ r = requests.get(metadataUrl)
+ results = json.loads(r.text)
+ return BaseDomsHandler.DomsQueryResults(results=results)
+ except:
+ raise DatasetNotFoundException("Dataset '%s' not found")
+
+ def __getUrlForDataset(self, dataset):
+ datasetSpec = config.getEndpointByName(dataset)
+ if datasetSpec is not None:
+ return datasetSpec["metadataUrl"]
+ else:
+
+ # KMG: NOT a good hack
+ if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM":
+ dataset = "MUR-JPL-L4-GLOB-v4.1"
+ elif dataset == "SMAP_L2B_SSS":
+ dataset = "JPL_SMAP-SSS_L2_EVAL-V2"
+
+ return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json"%dataset
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
new file mode 100644
index 0000000..2755aaf
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
@@ -0,0 +1,40 @@
+import BaseDomsHandler
+import mapplot
+import scatterplot
+import histogramplot
+from webservice.NexusHandler import nexus_handler
+
+
+class PlotTypes:
+ SCATTER = "scatter"
+ MAP = "map"
+ HISTOGRAM = "histogram"
+
+
+@nexus_handler
+class DomsResultsPlotHandler(BaseDomsHandler.BaseDomsQueryHandler):
+ name = "DOMS Results Plotting"
+ path = "/domsplot"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ id = computeOptions.get_argument("id", None)
+ parameter = computeOptions.get_argument('parameter', 'sst')
+
+ plotType = computeOptions.get_argument("type", PlotTypes.SCATTER)
+
+ normAndCurve = computeOptions.get_boolean_arg("normandcurve", False)
+
+ if plotType == PlotTypes.SCATTER:
+ return scatterplot.createScatterPlot(id, parameter)
+ elif plotType == PlotTypes.MAP:
+ return mapplot.createMapPlot(id, parameter)
+ elif plotType == PlotTypes.HISTOGRAM:
+ return histogramplot.createHistogramPlot(id, parameter, normAndCurve)
+ else:
+ raise Exception("Unsupported plot type '%s' specified." % plotType)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ResultsRetrieval.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
new file mode 100644
index 0000000..0bc1cbe
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
@@ -0,0 +1,34 @@
+import uuid
+
+import BaseDomsHandler
+import ResultsStorage
+from webservice.webmodel import NexusProcessingException
+from webservice.NexusHandler import nexus_handler
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler):
+ name = "DOMS Resultset Retrieval"
+ path = "/domsresults"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ execution_id = computeOptions.get_argument("id", None)
+
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except:
+ raise NexusProcessingException(reason="'id' argument must be a valid uuid", code=400)
+
+ simple_results = computeOptions.get_boolean_arg("simpleResults", default=False)
+
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
+
+ return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ResultsStorage.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
new file mode 100644
index 0000000..c8d40f0
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -0,0 +1,275 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+
+import ConfigParser
+import logging
+import uuid
+from datetime import datetime
+
+import numpy as np
+import pkg_resources
+from cassandra.cluster import Cluster
+from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
+from cassandra.query import BatchStatement
+from pytz import UTC
+
+
+class AbstractResultsContainer:
+ def __init__(self):
+ self._log = logging.getLogger(__name__)
+ self._log.info("Creating DOMS Results Storage Instance")
+
+ self._session = None
+
+ def __enter__(self):
+ domsconfig = ConfigParser.RawConfigParser()
+ domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini')
+
+ cassHost = domsconfig.get("cassandra", "host")
+ cassKeyspace = domsconfig.get("cassandra", "keyspace")
+ cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
+ cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+
+ dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+ token_policy = TokenAwarePolicy(dc_policy)
+
+ self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
+ protocol_version=cassVersion)
+
+ self._session = self._cluster.connect(cassKeyspace)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._cluster.shutdown()
+
+ def _parseDatetime(self, dtString):
+ dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+ epoch = datetime.utcfromtimestamp(0)
+ time = (dt - epoch).total_seconds() * 1000.0
+ return int(time)
+
+
+class ResultsStorage(AbstractResultsContainer):
+ def __init__(self):
+ AbstractResultsContainer.__init__(self)
+
+ def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
+ if isinstance(execution_id, basestring):
+ execution_id = uuid.UUID(execution_id)
+
+ execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+ self.__insertParams(execution_id, params)
+ self.__insertStats(execution_id, stats)
+ self.__insertResults(execution_id, results)
+ return execution_id
+
+ def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+ if execution_id is None:
+ execution_id = uuid.uuid4()
+
+ cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
+ self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+ return execution_id
+
+ def __insertParams(self, execution_id, params):
+ cql = """INSERT INTO doms_params
+ (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter)
+ VALUES
+ (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ """
+ self._session.execute(cql, (execution_id,
+ params["primary"],
+ ",".join(params["matchup"]) if type(params["matchup"]) == list else params[
+ "matchup"],
+ params["depthMin"] if "depthMin" in params.keys() else None,
+ params["depthMax"] if "depthMax" in params.keys() else None,
+ int(params["timeTolerance"]),
+ params["radiusTolerance"],
+ params["startTime"],
+ params["endTime"],
+ params["platforms"],
+ params["bbox"],
+ params["parameter"]
+ ))
+
+ def __insertStats(self, execution_id, stats):
+ cql = """
+ INSERT INTO doms_execution_stats
+ (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete)
+ VALUES
+ (%s, %s, %s, %s, %s, %s)
+ """
+ self._session.execute(cql, (
+ execution_id,
+ stats["numGriddedMatched"],
+ stats["numGriddedChecked"],
+ stats["numInSituMatched"],
+ stats["numInSituRecords"],
+ stats["timeToComplete"]
+ ))
+
+ def __insertResults(self, execution_id, results):
+
+ cql = """
+ INSERT INTO doms_data
+ (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary)
+ VALUES
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """
+ insertStatement = self._session.prepare(cql)
+ batch = BatchStatement()
+
+ for result in results:
+ self.__insertResult(execution_id, None, result, batch, insertStatement)
+
+ self._session.execute(batch)
+
+ def __insertResult(self, execution_id, primaryId, result, batch, insertStatement):
+
+ dataMap = self.__buildDataMap(result)
+ result_id = uuid.uuid4()
+ batch.add(insertStatement, (
+ result_id,
+ execution_id,
+ result["id"],
+ primaryId,
+ result["x"],
+ result["y"],
+ result["source"],
+ result["time"],
+ result["platform"] if "platform" in result else None,
+ result["device"] if "device" in result else None,
+ dataMap,
+ 1 if primaryId is None else 0
+ )
+ )
+
+ n = 0
+ if "matches" in result:
+ for match in result["matches"]:
+ self.__insertResult(execution_id, result["id"], match, batch, insertStatement)
+ n += 1
+ if n >= 20:
+ if primaryId is None:
+ self.__commitBatch(batch)
+ n = 0
+
+ if primaryId is None:
+ self.__commitBatch(batch)
+
+ def __commitBatch(self, batch):
+ self._session.execute(batch)
+ batch.clear()
+
+ def __buildDataMap(self, result):
+ dataMap = {}
+ for name in result:
+ value = result[name]
+ if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type(
+ value) in [float, int]:
+ dataMap[name] = value
+ return dataMap
+
+
+class ResultsRetrieval(AbstractResultsContainer):
+ def __init__(self):
+ AbstractResultsContainer.__init__(self)
+
+ def retrieveResults(self, execution_id, trim_data=False):
+ if isinstance(execution_id, basestring):
+ execution_id = uuid.UUID(execution_id)
+
+ params = self.__retrieveParams(execution_id)
+ stats = self.__retrieveStats(execution_id)
+ data = self.__retrieveData(execution_id, trim_data=trim_data)
+ return params, stats, data
+
+ def __retrieveData(self, id, trim_data=False):
+ dataMap = self.__retrievePrimaryData(id, trim_data=trim_data)
+ self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data)
+ data = [dataMap[name] for name in dataMap]
+ return data
+
+ def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False):
+ cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false"
+ rows = self._session.execute(cql, (id,))
+
+ for row in rows:
+ entry = self.__rowToDataEntry(row, trim_data=trim_data)
+ if row.primary_value_id in dataMap:
+ if not "matches" in dataMap[row.primary_value_id]:
+ dataMap[row.primary_value_id]["matches"] = []
+ dataMap[row.primary_value_id]["matches"].append(entry)
+ else:
+ print row
+
+ def __retrievePrimaryData(self, id, trim_data=False):
+ cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true"
+ rows = self._session.execute(cql, (id,))
+
+ dataMap = {}
+ for row in rows:
+ entry = self.__rowToDataEntry(row, trim_data=trim_data)
+ dataMap[row.value_id] = entry
+ return dataMap
+
+ def __rowToDataEntry(self, row, trim_data=False):
+ if trim_data:
+ entry = {
+ "x": float(row.x),
+ "y": float(row.y),
+ "source": row.source_dataset,
+ "time": row.measurement_time.replace(tzinfo=UTC)
+ }
+ else:
+ entry = {
+ "id": row.value_id,
+ "x": float(row.x),
+ "y": float(row.y),
+ "source": row.source_dataset,
+ "device": row.device,
+ "platform": row.platform,
+ "time": row.measurement_time.replace(tzinfo=UTC)
+ }
+ for key in row.measurement_values:
+ value = float(row.measurement_values[key])
+ entry[key] = value
+ return entry
+
+ def __retrieveStats(self, id):
+ cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1"
+ rows = self._session.execute(cql, (id,))
+ for row in rows:
+ stats = {
+ "numGriddedMatched": row.num_gridded_matched,
+ "numGriddedChecked": row.num_gridded_checked,
+ "numInSituMatched": row.num_insitu_matched,
+ "numInSituChecked": row.num_insitu_checked,
+ "timeToComplete": row.time_to_complete
+ }
+ return stats
+
+ raise Exception("Execution not found with id '%s'" % id)
+
+ def __retrieveParams(self, id):
+ cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
+ rows = self._session.execute(cql, (id,))
+ for row in rows:
+ params = {
+ "primary": row.primary_dataset,
+ "matchup": row.matchup_datasets.split(","),
+ "depthMin": row.depth_min,
+ "depthMax": row.depth_max,
+ "timeTolerance": row.time_tolerance,
+ "radiusTolerance": row.radius_tolerance,
+ "startTime": row.start_time.replace(tzinfo=UTC),
+ "endTime": row.end_time.replace(tzinfo=UTC),
+ "platforms": row.platforms,
+ "bbox": row.bounding_box,
+ "parameter": row.parameter
+ }
+ return params
+
+ raise Exception("Execution not found with id '%s'" % id)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/StatsQuery.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/StatsQuery.py b/analysis/webservice/algorithms/doms/StatsQuery.py
new file mode 100644
index 0000000..fae8639
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/StatsQuery.py
@@ -0,0 +1,52 @@
+from webservice.NexusHandler import NexusHandler as BaseHandler
+from webservice.webmodel import StatsComputeOptions
+from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException
+import BaseDomsHandler
+import datafetch
+
+@nexus_handler
+class DomsStatsQueryHandler(BaseDomsHandler.BaseDomsQueryHandler):
+
+ name = "DOMS In-Situ Stats Lookup"
+ path = "/domsstats"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+
+ source = computeOptions.get_argument("source", None)
+ startTime = computeOptions.get_argument("s", None)
+ endTime = computeOptions.get_argument("e", None)
+ bbox = computeOptions.get_argument("b", None)
+ timeTolerance = computeOptions.get_float_arg("tt")
+ depth_min = computeOptions.get_float_arg("depthMin", default=None)
+ depth_max = computeOptions.get_float_arg("depthMax", default=None)
+ radiusTolerance = computeOptions.get_float_arg("rt")
+ platforms = computeOptions.get_argument("platforms", None)
+
+ source1 = self.getDataSourceByName(source)
+ if source1 is None:
+ raise Exception("Source '%s' not found"%source)
+
+ count, bounds = datafetch.getCount(source1, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+ args = {
+ "source": source,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "timeTolerance": timeTolerance,
+ "depthMin": depth_min,
+ "depthMax": depth_max,
+ "radiusTolerance": radiusTolerance,
+ "platforms": platforms
+ }
+
+
+ return BaseDomsHandler.DomsQueryResults(results={}, args=args, details={}, bounds=bounds, count=count, computeOptions=None)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ValuesQuery.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/ValuesQuery.py b/analysis/webservice/algorithms/doms/ValuesQuery.py
new file mode 100644
index 0000000..9a0f8af
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ValuesQuery.py
@@ -0,0 +1,56 @@
+from webservice.NexusHandler import NexusHandler as BaseHandler
+from webservice.webmodel import StatsComputeOptions
+from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException
+import BaseDomsHandler
+import datafetch
+from pytz import timezone, UTC
+from datetime import datetime
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+
+@nexus_handler
+class DomsValuesQueryHandler(BaseDomsHandler.BaseDomsQueryHandler):
+
+ name = "DOMS In-Situ Value Lookup"
+ path = "/domsvalues"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+
+ source = computeOptions.get_argument("source", None)
+ startTime = computeOptions.get_start_datetime()
+ endTime = computeOptions.get_end_datetime()
+ bbox = computeOptions.get_argument("b", None)
+ timeTolerance = computeOptions.get_float_arg("tt")
+ depth_min = computeOptions.get_float_arg("depthMin", default=None)
+ depth_max = computeOptions.get_float_arg("depthMax", default=None)
+ radiusTolerance = computeOptions.get_float_arg("rt")
+ platforms = computeOptions.get_argument("platforms", "")
+
+ source1 = self.getDataSourceByName(source)
+ if source1 is None:
+ raise Exception("Source '%s' not found"%source)
+
+ values, bounds = datafetch.getValues(source1, startTime.strftime('%Y-%m-%dT%H:%M:%SZ'), endTime.strftime('%Y-%m-%dT%H:%M:%SZ'), bbox, depth_min, depth_max, platforms, placeholders=True)
+ count = len(values)
+
+ args = {
+ "source": source,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "timeTolerance": timeTolerance,
+ "depthMin": depth_min,
+ "depthMax": depth_max,
+ "radiusTolerance": radiusTolerance,
+ "platforms": platforms
+ }
+
+ return BaseDomsHandler.DomsQueryResults(results=values, args=args, bounds=bounds, details={}, count=count, computeOptions=None)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
new file mode 100644
index 0000000..10f4434
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -0,0 +1,22 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import BaseDomsHandler
+import config
+import datafetch
+import DatasetListQuery
+import DomsInitialization
+import fetchedgeimpl
+import geo
+import MatchupQuery
+import MetadataQuery
+import ResultsPlotQuery
+import ResultsRetrieval
+import ResultsStorage
+import StatsQuery
+import values
+import ValuesQuery
+import workerthread
+import insitusubset
+import subsetter
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/config.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
new file mode 100644
index 0000000..ff97e4f
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -0,0 +1,83 @@
+ENDPOINTS = [
+ {
+ "name": "samos",
+ "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json"
+ },
+ {
+ "name": "spurs",
+ "url": "https://doms.jpl.nasa.gov/ws/search/spurs",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json"
+ },
+ {
+ "name": "icoads",
+ "url": "http://rda-data.ucar.edu:8890/ws/search/icoads",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json"
+ },
+ {
+ "name": "spurs2",
+ "url": "https://doms.jpl.nasa.gov/ws/search/spurs2",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json"
+ }
+]
+
+import os
+
+try:
+ env = os.environ['ENV']
+ if env == 'dev':
+ ENDPOINTS = [
+ {
+ "name": "samos",
+ "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json"
+ },
+ {
+ "name": "spurs",
+ "url": "http://127.0.0.1:8890/ws/search/spurs",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json"
+ },
+ {
+ "name": "icoads",
+ "url": "http://rda-data.ucar.edu:8890/ws/search/icoads",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json"
+ },
+ {
+ "name": "spurs2",
+ "url": "https://doms.jpl.nasa.gov/ws/search/spurs2",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json"
+ }
+ ]
+except KeyError:
+ pass
+
+
+def getEndpointByName(name):
+ for endpoint in ENDPOINTS:
+ if endpoint["name"].upper() == name.upper():
+ return endpoint
+ return None
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/datafetch.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/datafetch.py b/analysis/webservice/algorithms/doms/datafetch.py
new file mode 100644
index 0000000..eee6b51
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/datafetch.py
@@ -0,0 +1,29 @@
+
+import fetchedgeimpl
+
+def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ return fetchedgeimpl.getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+
+def __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ return fetchedgeimpl.fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+def __fetchMultipleDataSource(endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ data = []
+ for endpoint in endpoints:
+ dataSingleSource = __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+ data = data + dataSingleSource
+ return data
+
+def fetchData(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ if type(endpoint) == list:
+ return __fetchMultipleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+ else:
+ return __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+
+def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
+ return fetchedgeimpl.getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms, placeholders)
+
+if __name__ == "__main__":
+ pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/domsconfig.ini
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini b/analysis/webservice/algorithms/doms/domsconfig.ini
new file mode 100644
index 0000000..6e113c3
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/domsconfig.ini
@@ -0,0 +1,13 @@
+[cassandra]
+host=127.0.0.1
+keyspace=doms
+local_datacenter=datacenter1
+protocol_version=3
+
+
+[cassandraDD]
+host=128.149.115.178,128.149.115.173,128.149.115.176,128.149.115.175,128.149.115.172,128.149.115.174,128.149.115.177
+keyspace=doms
+local_datacenter=B600
+protocol_version=3
+