You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2020/08/11 01:59:48 UTC
[incubator-sdap-nexus] 25/28: revert doms
This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch SDAP-268
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 121f2afe8faff81568655f74fb58fbb6d6d3718a
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 15:24:10 2020 -0700
revert doms
---
.../webservice/algorithms/doms/BaseDomsHandler.py | 635 +++++++++++++++++++++
.../webservice/algorithms/doms/DatasetListQuery.py | 116 ++++
.../algorithms/doms/DomsInitialization.py | 164 ++++++
.../webservice/algorithms/doms/MatchupQuery.py | 452 +++++++++++++++
.../webservice/algorithms/doms/MetadataQuery.py | 65 +++
.../webservice/algorithms/doms/ResultsPlotQuery.py | 55 ++
.../webservice/algorithms/doms/ResultsRetrieval.py | 49 ++
.../webservice/algorithms/doms/ResultsStorage.py | 286 ++++++++++
analysis/webservice/algorithms/doms/StatsQuery.py | 63 ++
analysis/webservice/algorithms/doms/ValuesQuery.py | 72 +++
analysis/webservice/algorithms/doms/__init__.py | 34 ++
analysis/webservice/algorithms/doms/config.py | 109 ++++
analysis/webservice/algorithms/doms/datafetch.py | 47 ++
.../algorithms/doms/domsconfig.ini.default | 15 +
.../webservice/algorithms/doms/fetchedgeimpl.py | 217 +++++++
analysis/webservice/algorithms/doms/geo.py | 129 +++++
.../webservice/algorithms/doms/histogramplot.py | 127 +++++
.../webservice/algorithms/doms/insitusubset.py | 263 +++++++++
analysis/webservice/algorithms/doms/mapplot.py | 175 ++++++
analysis/webservice/algorithms/doms/scatterplot.py | 118 ++++
analysis/webservice/algorithms/doms/subsetter.py | 260 +++++++++
analysis/webservice/algorithms/doms/values.py | 72 +++
.../webservice/algorithms/doms/workerthread.py | 61 ++
23 files changed, 3584 insertions(+)
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
new file mode 100644
index 0000000..d07f929
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -0,0 +1,635 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import StringIO
+import os
+import csv
+import json
+from datetime import datetime
+import time
+from decimal import Decimal
+
+import numpy as np
+from pytz import timezone, UTC
+
+import config
+import geo
+from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
+from webservice.webmodel import NexusResults
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+try:
+ from osgeo import gdal
+ from osgeo.gdalnumeric import *
+except ImportError:
+ import gdal
+ from gdalnumeric import *
+
+from netCDF4 import Dataset
+import netCDF4
+import tempfile
+
+
+class BaseDomsQueryCalcHandler(BaseHandler):
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def getDataSourceByName(self, source):
+ for s in config.ENDPOINTS:
+ if s["name"] == source:
+ return s
+ return None
+
+ def _does_datasource_exist(self, ds):
+ for endpoint in config.ENDPOINTS:
+ if endpoint["name"] == ds:
+ return True
+ return False
+
+
+class DomsEncoder(json.JSONEncoder):
+ def __init__(self, **args):
+ json.JSONEncoder.__init__(self, **args)
+
+ def default(self, obj):
+ # print 'MyEncoder.default() called'
+ # print type(obj)
+ if obj == np.nan:
+ return None # hard code string for now
+ elif isinstance(obj, datetime):
+ return long((obj - EPOCH).total_seconds())
+ elif isinstance(obj, Decimal):
+ return str(obj)
+ else:
+ return json.JSONEncoder.default(self, obj)
+
+
+class DomsQueryResults(NexusResults):
+ def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None,
+ executionId=None, status_code=200):
+ NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions,
+ status_code=status_code)
+ self.__args = args
+ self.__bounds = bounds
+ self.__count = count
+ self.__details = details
+ self.__executionId = str(executionId)
+
+ def toJson(self):
+ bounds = self.__bounds.toMap() if self.__bounds is not None else {}
+ return json.dumps(
+ {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds,
+ "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder)
+
+ def toCSV(self):
+ return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details)
+
+ def toNetCDF(self):
+ return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details)
+
+
+class DomsCSVFormatter:
+ @staticmethod
+ def create(executionId, results, params, details):
+
+ csv_mem_file = StringIO.StringIO()
+ try:
+ DomsCSVFormatter.__addConstants(csv_mem_file)
+ DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details)
+ csv.writer(csv_mem_file).writerow([])
+
+ DomsCSVFormatter.__packValues(csv_mem_file, results, params)
+
+ csv_out = csv_mem_file.getvalue()
+ finally:
+ csv_mem_file.close()
+
+ return csv_out
+
+ @staticmethod
+ def __packValues(csv_mem_file, results, params):
+
+ writer = csv.writer(csv_mem_file)
+
+ headers = [
+ # Primary
+ "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform",
+ "sea_surface_salinity (1e-3)", "sea_surface_temperature (degree_C)", "wind_speed (m s-1)", "wind_direction",
+ "wind_u (m s-1)", "wind_v (m s-1)",
+ # Match
+ "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform",
+ "depth (m)", "sea_water_salinity (1e-3)",
+ "sea_water_temperature (degree_C)", "wind_speed (m s-1)",
+ "wind_direction", "wind_u (m s-1)", "wind_v (m s-1)"
+ ]
+
+ writer.writerow(headers)
+
+ #
+ # Only include the depth variable related to the match-up parameter. If the match-up parameter
+ # is not sss or sst then do not include any depth data, just fill values.
+ #
+ if params["parameter"] == "sss":
+ depth = "sea_water_salinity_depth"
+ elif params["parameter"] == "sst":
+ depth = "sea_water_temperature_depth"
+ else:
+ depth = "NO_DEPTH"
+
+ for primaryValue in results:
+ for matchup in primaryValue["matches"]:
+ row = [
+ # Primary
+ primaryValue["id"], primaryValue["source"], str(primaryValue["x"]), str(primaryValue["y"]),
+ primaryValue["time"].strftime(ISO_8601), primaryValue["platform"],
+ primaryValue.get("sea_water_salinity", ""), primaryValue.get("sea_water_temperature", ""),
+ primaryValue.get("wind_speed", ""), primaryValue.get("wind_direction", ""),
+ primaryValue.get("wind_u", ""), primaryValue.get("wind_v", ""),
+
+ # Matchup
+ matchup["id"], matchup["source"], matchup["x"], matchup["y"],
+ matchup["time"].strftime(ISO_8601), matchup["platform"],
+ matchup.get(depth, ""), matchup.get("sea_water_salinity", ""),
+ matchup.get("sea_water_temperature", ""),
+ matchup.get("wind_speed", ""), matchup.get("wind_direction", ""),
+ matchup.get("wind_u", ""), matchup.get("wind_v", ""),
+ ]
+ writer.writerow(row)
+
+ @staticmethod
+ def __addConstants(csvfile):
+
+ global_attrs = [
+ {"Global Attribute": "product_version", "Value": "1.0"},
+ {"Global Attribute": "Conventions", "Value": "CF-1.6, ACDD-1.3"},
+ {"Global Attribute": "title", "Value": "DOMS satellite-insitu machup output file"},
+ {"Global Attribute": "history",
+ "Value": "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"},
+ {"Global Attribute": "institution", "Value": "JPL, FSU, NCAR"},
+ {"Global Attribute": "source", "Value": "doms.jpl.nasa.gov"},
+ {"Global Attribute": "standard_name_vocabulary",
+ "Value": "CF Standard Name Table v27, BODC controlled vocabulary"},
+ {"Global Attribute": "cdm_data_type", "Value": "Point/Profile, Swath/Grid"},
+ {"Global Attribute": "processing_level", "Value": "4"},
+ {"Global Attribute": "project", "Value": "Distributed Oceanographic Matchup System (DOMS)"},
+ {"Global Attribute": "keywords_vocabulary",
+ "Value": "NASA Global Change Master Directory (GCMD) Science Keywords"},
+ # TODO What should the keywords be?
+ {"Global Attribute": "keywords", "Value": "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, "
+ "NASA/JPL/PODAAC, FSU/COAPS, UCAR/NCAR, SALINITY, "
+ "SEA SURFACE TEMPERATURE, SURFACE WINDS"},
+ {"Global Attribute": "creator_name", "Value": "NASA PO.DAAC"},
+ {"Global Attribute": "creator_email", "Value": "podaac@podaac.jpl.nasa.gov"},
+ {"Global Attribute": "creator_url", "Value": "https://podaac.jpl.nasa.gov/"},
+ {"Global Attribute": "publisher_name", "Value": "NASA PO.DAAC"},
+ {"Global Attribute": "publisher_email", "Value": "podaac@podaac.jpl.nasa.gov"},
+ {"Global Attribute": "publisher_url", "Value": "https://podaac.jpl.nasa.gov"},
+ {"Global Attribute": "acknowledgment", "Value": "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."},
+ ]
+
+ writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
+
+ writer.writerows(global_attrs)
+
+ @staticmethod
+ def __addDynamicAttrs(csvfile, executionId, results, params, details):
+
+ platforms = set()
+ for primaryValue in results:
+ platforms.add(primaryValue['platform'])
+ for match in primaryValue['matches']:
+ platforms.add(match['platform'])
+
+ # insituDatasets = params["matchup"].split(",")
+ insituDatasets = params["matchup"]
+ insituLinks = set()
+ for insitu in insituDatasets:
+ insituLinks.add(config.METADATA_LINKS[insitu])
+
+
+ global_attrs = [
+ {"Global Attribute": "Platform", "Value": ', '.join(platforms)},
+ {"Global Attribute": "time_coverage_start",
+ "Value": params["startTime"].strftime(ISO_8601)},
+ {"Global Attribute": "time_coverage_end",
+ "Value": params["endTime"].strftime(ISO_8601)},
+ {"Global Attribute": "time_coverage_resolution", "Value": "point"},
+
+ {"Global Attribute": "geospatial_lon_min", "Value": params["bbox"].split(',')[0]},
+ {"Global Attribute": "geospatial_lat_min", "Value": params["bbox"].split(',')[1]},
+ {"Global Attribute": "geospatial_lon_max", "Value": params["bbox"].split(',')[2]},
+ {"Global Attribute": "geospatial_lat_max", "Value": params["bbox"].split(',')[3]},
+ {"Global Attribute": "geospatial_lat_resolution", "Value": "point"},
+ {"Global Attribute": "geospatial_lon_resolution", "Value": "point"},
+ {"Global Attribute": "geospatial_lat_units", "Value": "degrees_north"},
+ {"Global Attribute": "geospatial_lon_units", "Value": "degrees_east"},
+
+ {"Global Attribute": "geospatial_vertical_min", "Value": params["depthMin"]},
+ {"Global Attribute": "geospatial_vertical_max", "Value": params["depthMax"]},
+ {"Global Attribute": "geospatial_vertical_units", "Value": "m"},
+ {"Global Attribute": "geospatial_vertical_resolution", "Value": "point"},
+ {"Global Attribute": "geospatial_vertical_positive", "Value": "down"},
+
+ {"Global Attribute": "DOMS_matchID", "Value": executionId},
+ {"Global Attribute": "DOMS_TimeWindow", "Value": params["timeTolerance"] / 60 / 60},
+ {"Global Attribute": "DOMS_TimeWindow_Units", "Value": "hours"},
+
+ {"Global Attribute": "DOMS_platforms", "Value": params["platforms"]},
+ {"Global Attribute": "DOMS_SearchRadius", "Value": params["radiusTolerance"]},
+ {"Global Attribute": "DOMS_SearchRadius_Units", "Value": "m"},
+
+ {"Global Attribute": "DOMS_DatasetMetadata", "Value": ', '.join(insituLinks)},
+ {"Global Attribute": "DOMS_primary", "Value": params["primary"]},
+ {"Global Attribute": "DOMS_match_up", "Value": params["matchup"]},
+ {"Global Attribute": "DOMS_ParameterPrimary", "Value": params.get("parameter", "")},
+
+ {"Global Attribute": "DOMS_time_to_complete", "Value": details["timeToComplete"]},
+ {"Global Attribute": "DOMS_time_to_complete_units", "Value": "seconds"},
+ {"Global Attribute": "DOMS_num_matchup_matched", "Value": details["numInSituMatched"]},
+ {"Global Attribute": "DOMS_num_primary_matched", "Value": details["numGriddedMatched"]},
+
+ {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
+ {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
+
+ {"Global Attribute": "URI_Matchup", "Value": "http://{webservice}/domsresults?id=" + executionId + "&output=CSV"},
+ ]
+
+ writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
+
+ writer.writerows(global_attrs)
+
+
+class DomsNetCDFFormatter:
+ @staticmethod
+ def create(executionId, results, params, details):
+
+ t = tempfile.mkstemp(prefix="doms_", suffix=".nc")
+ tempFileName = t[1]
+
+ dataset = Dataset(tempFileName, "w", format="NETCDF4")
+ dataset.DOMS_matchID = executionId
+ DomsNetCDFFormatter.__addNetCDFConstants(dataset)
+
+ dataset.date_modified = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)
+ dataset.date_created = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)
+ dataset.time_coverage_start = params["startTime"].strftime(ISO_8601)
+ dataset.time_coverage_end = params["endTime"].strftime(ISO_8601)
+ dataset.time_coverage_resolution = "point"
+ dataset.DOMS_match_up = params["matchup"]
+ dataset.DOMS_num_matchup_matched = details["numInSituMatched"]
+ dataset.DOMS_num_primary_matched = details["numGriddedMatched"]
+
+ bbox = geo.BoundingBox(asString=params["bbox"])
+ dataset.geospatial_lat_max = bbox.north
+ dataset.geospatial_lat_min = bbox.south
+ dataset.geospatial_lon_max = bbox.east
+ dataset.geospatial_lon_min = bbox.west
+ dataset.geospatial_lat_resolution = "point"
+ dataset.geospatial_lon_resolution = "point"
+ dataset.geospatial_lat_units = "degrees_north"
+ dataset.geospatial_lon_units = "degrees_east"
+ dataset.geospatial_vertical_min = float(params["depthMin"])
+ dataset.geospatial_vertical_max = float(params["depthMax"])
+ dataset.geospatial_vertical_units = "m"
+ dataset.geospatial_vertical_resolution = "point"
+ dataset.geospatial_vertical_positive = "down"
+
+ dataset.DOMS_TimeWindow = params["timeTolerance"] / 60 / 60
+ dataset.DOMS_TimeWindow_Units = "hours"
+ dataset.DOMS_SearchRadius = float(params["radiusTolerance"])
+ dataset.DOMS_SearchRadius_Units = "m"
+ # dataset.URI_Subset = "http://webservice subsetting query request"
+ dataset.URI_Matchup = "http://{webservice}/domsresults?id=" + executionId + "&output=NETCDF"
+ dataset.DOMS_ParameterPrimary = params["parameter"] if "parameter" in params else ""
+ dataset.DOMS_platforms = params["platforms"]
+ dataset.DOMS_primary = params["primary"]
+ dataset.DOMS_time_to_complete = details["timeToComplete"]
+ dataset.DOMS_time_to_complete_units = "seconds"
+
+ insituDatasets = params["matchup"]
+ insituLinks = set()
+ for insitu in insituDatasets:
+ insituLinks.add(config.METADATA_LINKS[insitu])
+ dataset.DOMS_DatasetMetadata = ', '.join(insituLinks)
+
+ platforms = set()
+ for primaryValue in results:
+ platforms.add(primaryValue['platform'])
+ for match in primaryValue['matches']:
+ platforms.add(match['platform'])
+ dataset.platform = ', '.join(platforms)
+
+ satellite_group_name = "SatelliteData"
+ insitu_group_name = "InsituData"
+
+ #Create Satellite group, variables, and attributes
+ satelliteGroup = dataset.createGroup(satellite_group_name)
+ satelliteWriter = DomsNetCDFValueWriter(satelliteGroup, params["parameter"])
+
+ # Create InSitu group, variables, and attributes
+ insituGroup = dataset.createGroup(insitu_group_name)
+ insituWriter = DomsNetCDFValueWriter(insituGroup, params["parameter"])
+
+ # Add data to Insitu and Satellite groups, generate array of match ID pairs
+ matches = DomsNetCDFFormatter.__writeResults(results, satelliteWriter, insituWriter)
+ dataset.createDimension("MatchedRecords", size=None)
+ dataset.createDimension("MatchedGroups", size=2)
+ matchArray = dataset.createVariable("matchIDs", "f4", ("MatchedRecords", "MatchedGroups"))
+ matchArray[:] = matches
+
+ dataset.close()
+ f = open(tempFileName, "rb")
+ data = f.read()
+ f.close()
+ os.unlink(tempFileName)
+ return data
+
+ @staticmethod
+ def __addNetCDFConstants(dataset):
+ dataset.product_version = "1.0"
+ dataset.Conventions = "CF-1.6, ACDD-1.3"
+ dataset.title = "DOMS satellite-insitu machup output file"
+ dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"
+ dataset.institution = "JPL, FSU, NCAR"
+ dataset.source = "doms.jpl.nasa.gov"
+ dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary"
+ dataset.cdm_data_type = "Point/Profile, Swath/Grid"
+ dataset.processing_level = "4"
+ dataset.project = "Distributed Oceanographic Matchup System (DOMS)"
+ dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords"
+ dataset.keywords = "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, NASA/JPL/PODAAC, " \
+ "FSU/COAPS, UCAR/NCAR, SALINITY, SEA SURFACE TEMPERATURE, SURFACE WINDS"
+ dataset.creator_name = "NASA PO.DAAC"
+ dataset.creator_email = "podaac@podaac.jpl.nasa.gov"
+ dataset.creator_url = "https://podaac.jpl.nasa.gov/"
+ dataset.publisher_name = "NASA PO.DAAC"
+ dataset.publisher_email = "podaac@podaac.jpl.nasa.gov"
+ dataset.publisher_url = "https://podaac.jpl.nasa.gov"
+ dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."
+
+ @staticmethod
+ def __writeResults(results, satelliteWriter, insituWriter):
+ ids = {}
+ matches = []
+ insituIndex = 0
+
+ #
+ # Loop through all of the results, add each satellite data point to the array
+ #
+ for r in range(0, len(results)):
+ result = results[r]
+ satelliteWriter.addData(result)
+
+ # Add each match only if it is not already in the array of in situ points
+ for match in result["matches"]:
+ if match["id"] not in ids:
+ ids[match["id"]] = insituIndex
+ insituIndex += 1
+ insituWriter.addData(match)
+
+ # Append an index pait of (satellite, in situ) to the array of matches
+ matches.append((r, ids[match["id"]]))
+
+ # Add data/write to the netCDF file
+ satelliteWriter.writeGroup()
+ insituWriter.writeGroup()
+
+ return matches
+
+
+class DomsNetCDFValueWriter:
+ def __init__(self, group, matchup_parameter):
+ group.createDimension("dim", size=None)
+ self.group = group
+
+ self.lat = []
+ self.lon = []
+ self.time = []
+ self.sea_water_salinity = []
+ self.wind_speed = []
+ self.wind_u = []
+ self.wind_v = []
+ self.wind_direction = []
+ self.sea_water_temperature = []
+ self.depth = []
+
+ self.satellite_group_name = "SatelliteData"
+ self.insitu_group_name = "InsituData"
+
+ #
+ # Only include the depth variable related to the match-up parameter. If the match-up parameter is
+ # not sss or sst then do not include any depth data, just fill values.
+ #
+ if matchup_parameter == "sss":
+ self.matchup_depth = "sea_water_salinity_depth"
+ elif matchup_parameter == "sst":
+ self.matchup_depth = "sea_water_temperature_depth"
+ else:
+ self.matchup_depth = "NO_DEPTH"
+
+ def addData(self, value):
+ self.lat.append(value.get("y", None))
+ self.lon.append(value.get("x", None))
+ self.time.append(time.mktime(value.get("time").timetuple()))
+ self.sea_water_salinity.append(value.get("sea_water_salinity", None))
+ self.wind_speed.append(value.get("wind_speed", None))
+ self.wind_u.append(value.get("wind_u", None))
+ self.wind_v.append(value.get("wind_v", None))
+ self.wind_direction.append(value.get("wind_direction", None))
+ self.sea_water_temperature.append(value.get("sea_water_temperature", None))
+ self.depth.append(value.get(self.matchup_depth, None))
+
+ def writeGroup(self):
+ #
+ # Create variables, enrich with attributes, and add data
+ #
+ lonVar = self.group.createVariable("lon", "f4", ("dim",), fill_value=-32767.0)
+ latVar = self.group.createVariable("lat", "f4", ("dim",), fill_value=-32767.0)
+ timeVar = self.group.createVariable("time", "f4", ("dim",), fill_value=-32767.0)
+
+ self.__enrichLon(lonVar, min(self.lon), max(self.lon))
+ self.__enrichLat(latVar, min(self.lat), max(self.lat))
+ self.__enrichTime(timeVar)
+
+ latVar[:] = self.lat
+ lonVar[:] = self.lon
+ timeVar[:] = self.time
+
+ if self.sea_water_salinity.count(None) != len(self.sea_water_salinity):
+ if self.group.name == self.satellite_group_name:
+ sssVar = self.group.createVariable("SeaSurfaceSalinity", "f4", ("dim",), fill_value=-32767.0)
+ self.__enrichSSSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity))
+ else: # group.name == self.insitu_group_name
+ sssVar = self.group.createVariable("SeaWaterSalinity", "f4", ("dim",), fill_value=-32767.0)
+ self.__enrichSWSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity))
+ sssVar[:] = self.sea_water_salinity
+
+ if self.wind_speed.count(None) != len(self.wind_speed):
+ windSpeedVar = self.group.createVariable("WindSpeed", "f4", ("dim",), fill_value=-32767.0)
+ self.__enrichWindSpeed(windSpeedVar, self.__calcMin(self.wind_speed), max(self.wind_speed))
+ windSpeedVar[:] = self.wind_speed
+
+ if self.wind_u.count(None) != len(self.wind_u):
+ windUVar = self.group.createVariable("WindU", "f4", ("dim",), fill_value=-32767.0)
+ windUVar[:] = self.wind_u
+ self.__enrichWindU(windUVar, self.__calcMin(self.wind_u), max(self.wind_u))
+
+ if self.wind_v.count(None) != len(self.wind_v):
+ windVVar = self.group.createVariable("WindV", "f4", ("dim",), fill_value=-32767.0)
+ windVVar[:] = self.wind_v
+ self.__enrichWindV(windVVar, self.__calcMin(self.wind_v), max(self.wind_v))
+
+ if self.wind_direction.count(None) != len(self.wind_direction):
+ windDirVar = self.group.createVariable("WindDirection", "f4", ("dim",), fill_value=-32767.0)
+ windDirVar[:] = self.wind_direction
+ self.__enrichWindDir(windDirVar)
+
+ if self.sea_water_temperature.count(None) != len(self.sea_water_temperature):
+ if self.group.name == self.satellite_group_name:
+ tempVar = self.group.createVariable("SeaSurfaceTemp", "f4", ("dim",), fill_value=-32767.0)
+ self.__enrichSurfaceTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature))
+ else:
+ tempVar = self.group.createVariable("SeaWaterTemp", "f4", ("dim",), fill_value=-32767.0)
+ self.__enrichWaterTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature))
+ tempVar[:] = self.sea_water_temperature
+
+ if self.group.name == self.insitu_group_name:
+ depthVar = self.group.createVariable("Depth", "f4", ("dim",), fill_value=-32767.0)
+
+ if self.depth.count(None) != len(self.depth):
+ self.__enrichDepth(depthVar, self.__calcMin(self.depth), max(self.depth))
+ depthVar[:] = self.depth
+ else:
+ # If depth has no data, set all values to 0
+ tempDepth = [0 for x in range(len(self.depth))]
+ depthVar[:] = tempDepth
+
+ #
+ # Lists may include 'None" values, to calc min these must be filtered out
+ #
+ @staticmethod
+ def __calcMin(var):
+ return min(x for x in var if x is not None)
+
+
+ #
+ # Add attributes to each variable
+ #
+ @staticmethod
+ def __enrichLon(var, var_min, var_max):
+ var.long_name = "Longitude"
+ var.standard_name = "longitude"
+ var.axis = "X"
+ var.units = "degrees_east"
+ var.valid_min = var_min
+ var.valid_max = var_max
+
+ @staticmethod
+ def __enrichLat(var, var_min, var_max):
+ var.long_name = "Latitude"
+ var.standard_name = "latitude"
+ var.axis = "Y"
+ var.units = "degrees_north"
+ var.valid_min = var_min
+ var.valid_max = var_max
+
+ @staticmethod
+ def __enrichTime(var):
+ var.long_name = "Time"
+ var.standard_name = "time"
+ var.axis = "T"
+ var.units = "seconds since 1970-01-01 00:00:00 0:00"
+
+ @staticmethod
+ def __enrichSSSMeasurements(var, var_min, var_max):
+ var.long_name = "Sea surface salinity"
+ var.standard_name = "sea_surface_salinity"
+ var.units = "1e-3"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat time"
+
+ @staticmethod
+ def __enrichSWSMeasurements(var, var_min, var_max):
+ var.long_name = "Sea water salinity"
+ var.standard_name = "sea_water_salinity"
+ var.units = "1e-3"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat depth time"
+
+ @staticmethod
+ def __enrichDepth(var, var_min, var_max):
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.units = "m"
+ var.long_name = "Depth"
+ var.standard_name = "depth"
+ var.axis = "Z"
+ var.positive = "Down"
+
+ @staticmethod
+ def __enrichWindSpeed(var, var_min, var_max):
+ var.long_name = "Wind speed"
+ var.standard_name = "wind_speed"
+ var.units = "m s-1"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat depth time"
+
+ @staticmethod
+ def __enrichWindU(var, var_min, var_max):
+ var.long_name = "Eastward wind"
+ var.standard_name = "eastward_wind"
+ var.units = "m s-1"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat depth time"
+
+ @staticmethod
+ def __enrichWindV(var, var_min, var_max):
+ var.long_name = "Northward wind"
+ var.standard_name = "northward_wind"
+ var.units = "m s-1"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat depth time"
+
+ @staticmethod
+ def __enrichWaterTemp(var, var_min, var_max):
+ var.long_name = "Sea water temperature"
+ var.standard_name = "sea_water_temperature"
+ var.units = "degree_C"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat depth time"
+
+ @staticmethod
+ def __enrichSurfaceTemp(var, var_min, var_max):
+ var.long_name = "Sea surface temperature"
+ var.standard_name = "sea_surface_temperature"
+ var.units = "degree_C"
+ var.valid_min = var_min
+ var.valid_max = var_max
+ var.coordinates = "lon lat time"
+
+ @staticmethod
+ def __enrichWindDir(var):
+ var.long_name = "Wind from direction"
+ var.standard_name = "wind_from_direction"
+ var.units = "degree"
+ var.coordinates = "lon lat depth time"
diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py
new file mode 100644
index 0000000..ac7f263
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/DatasetListQuery.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import traceback
+
+import requests
+
+import BaseDomsHandler
+import config
+import values
+from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import cached
+
+
+@nexus_handler
+class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS Dataset Listing"
+ path = "/domslist"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def getFacetsForInsituSource(self, source):
+ url = source["url"]
+
+ params = {
+ "facet": "true",
+ "stats": "true",
+ "startIndex": 0,
+ "itemsPerPage": 0
+ }
+ try:
+ r = requests.get(url, params=params)
+ results = json.loads(r.text)
+
+ depths = None
+ if "stats_fields" in results and "depth" in results["stats_fields"]:
+ depths = results["stats_fields"]["depth"]
+
+ for facet in results["facets"]:
+ field = facet["field"]
+ for value in facet["values"]:
+ value["value"] = values.getDescByListNameAndId(field, int(value["value"]))
+
+ return depths, results["facets"]
+ except: # KMG: Don't eat the exception. Add better handling...
+ traceback.print_exc()
+ return None, None
+
+ def getMetadataUrlForDataset(self, dataset):
+ datasetSpec = config.getEndpointByName(dataset)
+ if datasetSpec is not None:
+ return datasetSpec["metadataUrl"]
+ else:
+
+ # KMG: NOT a good hack
+ if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM":
+ dataset = "MUR-JPL-L4-GLOB-v4.1"
+ elif dataset == "SMAP_L2B_SSS":
+ dataset = "JPL_SMAP-SSS_L2_EVAL-V2"
+ elif dataset == "AVHRR_OI_L4_GHRSST_NCEI" or dataset == "AVHRR_OI_L4_GHRSST_NCEI_CLIM":
+ dataset = "AVHRR_OI-NCEI-L4-GLOB-v2.0"
+
+ return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset
+
+ def getMetadataForSource(self, dataset):
+ try:
+ r = requests.get(self.getMetadataUrlForDataset(dataset))
+ results = json.loads(r.text)
+ return results
+ except:
+ return None
+
+ @cached(ttl=(60 * 60 * 1000)) # 1 hour cached
+ def calc(self, computeOptions, **args):
+
+ satellitesList = self._get_tile_service().get_dataseries_list(simple=True)
+
+ insituList = []
+
+ for satellite in satellitesList:
+ satellite["metadata"] = self.getMetadataForSource(satellite["shortName"])
+
+ for insitu in config.ENDPOINTS:
+ depths, facets = self.getFacetsForInsituSource(insitu)
+ insituList.append({
+ "name": insitu["name"],
+ "endpoint": insitu["url"],
+ "metadata": self.getMetadataForSource(insitu["name"]),
+ "depths": depths,
+ "facets": facets
+ })
+
+ values = {
+ "satellite": satellitesList,
+ "insitu": insituList
+ }
+
+ return BaseDomsHandler.DomsQueryResults(results=values)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
new file mode 100644
index 0000000..2d429ca
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+import ConfigParser
+import logging
+
+import pkg_resources
+from cassandra.cluster import Cluster
+from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
+
+from webservice.NexusHandler import nexus_initializer
+
+@nexus_initializer
+class DomsInitializer:
+ def __init__(self):
+ pass
+
+ def init(self, config):
+ log = logging.getLogger(__name__)
+ log.info("*** STARTING DOMS INITIALIZATION ***")
+
+ domsconfig = ConfigParser.SafeConfigParser()
+ domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini'))
+
+ cassHost = domsconfig.get("cassandra", "host")
+ cassPort = domsconfig.get("cassandra", "port")
+ cassKeyspace = domsconfig.get("cassandra", "keyspace")
+ cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
+ cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+ cassPolicy = domsconfig.get("cassandra", "dc_policy")
+
+ log.info("Cassandra Host(s): %s" % (cassHost))
+ log.info("Cassandra Keyspace: %s" % (cassKeyspace))
+ log.info("Cassandra Datacenter: %s" % (cassDatacenter))
+ log.info("Cassandra Protocol Version: %s" % (cassVersion))
+ log.info("Cassandra DC Policy: %s" % (cassPolicy))
+
+ if cassPolicy == 'DCAwareRoundRobinPolicy':
+ dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+ elif cassPolicy == 'WhiteListRoundRobinPolicy':
+ dc_policy = WhiteListRoundRobinPolicy([cassHost])
+ token_policy = TokenAwarePolicy(dc_policy)
+
+ with Cluster([host for host in cassHost.split(',')], port=int(cassPort), load_balancing_policy=token_policy,
+ protocol_version=cassVersion) as cluster:
+ session = cluster.connect()
+
+ self.createKeyspace(session, cassKeyspace)
+ self.createTables(session)
+
+ def createKeyspace(self, session, cassKeyspace):
+ log = logging.getLogger(__name__)
+ log.info("Verifying DOMS keyspace '%s'" % cassKeyspace)
+ session.execute(
+ "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace)
+ session.set_keyspace(cassKeyspace)
+
+ def createTables(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying DOMS tables")
+ self.createDomsExecutionsTable(session)
+ self.createDomsParamsTable(session)
+ self.createDomsDataTable(session)
+ self.createDomsExecutionStatsTable(session)
+
+ def createDomsExecutionsTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_executions table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_executions (
+ id uuid PRIMARY KEY,
+ time_started timestamp,
+ time_completed timestamp,
+ user_email text
+ );
+ """
+ session.execute(cql)
+
+ def createDomsParamsTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_params table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_params (
+ execution_id uuid PRIMARY KEY,
+ primary_dataset text,
+ matchup_datasets text,
+ depth_tolerance decimal,
+ depth_min decimal,
+ depth_max decimal,
+ time_tolerance int,
+ radius_tolerance decimal,
+ start_time timestamp,
+ end_time timestamp,
+ platforms text,
+ bounding_box text,
+ parameter text
+ );
+ """
+ session.execute(cql)
+
+ def createDomsDataTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_data table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_data (
+ id uuid,
+ execution_id uuid,
+ value_id text,
+ primary_value_id text,
+ is_primary boolean,
+ x decimal,
+ y decimal,
+ source_dataset text,
+ measurement_time timestamp,
+ platform text,
+ device text,
+ measurement_values map<text, decimal>,
+ PRIMARY KEY (execution_id, is_primary, id)
+ );
+ """
+ session.execute(cql)
+
+ def createDomsExecutionStatsTable(self, session):
+ log = logging.getLogger(__name__)
+ log.info("Verifying doms_execuction_stats table")
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_execution_stats (
+ execution_id uuid PRIMARY KEY,
+ num_gridded_matched int,
+ num_gridded_checked int,
+ num_insitu_matched int,
+ num_insitu_checked int,
+ time_to_complete int
+ );
+ """
+ session.execute(cql)
+
+ @staticmethod
+ def _get_config_files(filename):
+ log = logging.getLogger(__name__)
+ candidates = []
+ extensions = ['.default', '']
+ for extension in extensions:
+ try:
+ candidate = pkg_resources.resource_filename(__name__, filename + extension)
+ candidates.append(candidate)
+ except KeyError as ke:
+ log.warning('configuration file {} not found'.format(filename + extension))
+
+ return candidates
diff --git a/analysis/webservice/algorithms/doms/MatchupQuery.py b/analysis/webservice/algorithms/doms/MatchupQuery.py
new file mode 100644
index 0000000..57a0834
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/MatchupQuery.py
@@ -0,0 +1,452 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import uuid
+from datetime import datetime
+
+import numpy as np
+import utm
+from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
+from scipy import spatial
+
+import BaseDomsHandler
+import ResultsStorage
+import datafetch
+import fetchedgeimpl
+import geo
+import workerthread
+from webservice.NexusHandler import nexus_handler
+
+
+@nexus_handler
+class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "Experimental Combined DOMS In-Situ Matchup"
+ path = "/domsmatchup"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
+
+ def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms):
+
+ boundsConstrainer = geo.BoundsConstrainer(asString=bbox)
+ threads = []
+ for endpoint in endpoints:
+ thread = workerthread.WorkerThread(datafetch.fetchData,
+ params=(endpoint, startTime, endTime, bbox, depth_min, depth_max))
+ threads.append(thread)
+ workerthread.wait(threads, startFirst=True, poll=0.01)
+
+ data2 = []
+ for thread in threads:
+ data, bounds = thread.results
+ data2 += data
+ boundsConstrainer.testOtherConstrainer(bounds)
+
+ return data2, boundsConstrainer
+
+ def __parseDatetime(self, dtString):
+ dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+ epoch = datetime.utcfromtimestamp(0)
+ time = (dt - epoch).total_seconds() * 1000.0
+ return time
+
+ def calc(self, computeOptions, **args):
+ primary = computeOptions.get_argument("primary", None)
+ matchup = computeOptions.get_argument("matchup", None)
+ startTime = computeOptions.get_argument("s", None)
+ endTime = computeOptions.get_argument("e", None)
+ bbox = computeOptions.get_argument("b", None)
+ timeTolerance = computeOptions.get_float_arg("tt")
+ depth_min = computeOptions.get_float_arg("depthMin", default=None)
+ depth_max = computeOptions.get_float_arg("depthMax", default=None)
+ radiusTolerance = computeOptions.get_float_arg("rt")
+ platforms = computeOptions.get_argument("platforms", None)
+
+ if primary is None or len(primary) == 0:
+ raise Exception("No primary dataset specified")
+
+ if matchup is None or len(matchup) == 0:
+ raise Exception("No matchup datasets specified")
+
+ start = self._now()
+
+ primarySpec = self.getDataSourceByName(primary)
+ if primarySpec is None:
+ raise Exception("Specified primary dataset not found using identifier '%s'" % primary)
+
+ primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+ primaryContext = MatchupContext(primaryData)
+
+ matchupIds = matchup.split(",")
+
+ for matchupId in matchupIds:
+ matchupSpec = self.getDataSourceByName(matchupId)
+
+ if matchupSpec is not None: # Then it's in the in-situ configuration
+ proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min,
+ depth_max,
+ platforms, timeTolerance, radiusTolerance)
+ proc.start()
+ else: # We assume it to be a Nexus tiled dataset
+
+ '''
+ Single Threaded at the moment...
+ '''
+ daysinrange = self._get_tile_service().find_days_in_range_asc(bounds.south, bounds.north, bounds.west,
+ bounds.east, matchupId,
+ self.__parseDatetime(startTime) / 1000,
+ self.__parseDatetime(endTime) / 1000)
+
+ tilesByDay = {}
+ for dayTimestamp in daysinrange:
+ ds1_nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box_at_time(bounds.south, bounds.north,
+ bounds.west, bounds.east,
+ matchupId, dayTimestamp)
+
+ # print "***", type(ds1_nexus_tiles)
+ # print ds1_nexus_tiles[0].__dict__
+ tilesByDay[dayTimestamp] = ds1_nexus_tiles
+
+ primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance)
+
+ matches, numMatches = primaryContext.getFinal(len(matchupIds))
+
+ end = self._now()
+
+ args = {
+ "primary": primary,
+ "matchup": matchupIds,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "timeTolerance": timeTolerance,
+ "depthMin": depth_min,
+ "depthMax": depth_max,
+ "radiusTolerance": radiusTolerance,
+ "platforms": platforms
+ }
+
+ details = {
+ "timeToComplete": (end - start),
+ "numInSituRecords": primaryContext.insituCount,
+ "numInSituMatched": primaryContext.insituMatches,
+ "numGriddedChecked": primaryContext.griddedCount,
+ "numGriddedMatched": primaryContext.griddedMatched
+ }
+
+ with ResultsStorage.ResultsStorage() as resultsStorage:
+ execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start,
+ completeTime=end, userEmail="")
+
+ return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id)
+
+
+class MatchupContextMap:
+ def __init__(self):
+ pass
+
+ def add(self, context):
+ pass
+
+ def delete(self, context):
+ pass
+
+
+class MatchupContext:
+ def __init__(self, primaryData):
+ self.id = str(uuid.uuid4())
+
+ self.griddedCount = 0
+ self.griddedMatched = 0
+
+ self.insituCount = len(primaryData)
+ self.insituMatches = 0
+
+ self.primary = primaryData
+ for r in self.primary:
+ r["matches"] = []
+
+ self.data = []
+ for s in primaryData:
+ u = utm.from_latlon(s["y"], s["x"])
+ v = (u[0], u[1], 0.0)
+ self.data.append(v)
+
+ if len(self.data) > 0:
+ self.tree = spatial.KDTree(self.data)
+ else:
+ self.tree = None
+
+ def getFinal(self, minMatchesToInclude):
+
+ matched = []
+ ttlMatches = 0
+ for m in self.primary:
+ if len(m["matches"]) >= minMatchesToInclude:
+ matched.append(m)
+ ttlMatches += len(m["matches"])
+
+ return matched, ttlMatches
+
+ def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance):
+ for r in self.primary:
+ foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"],
+ xyTolerance)
+ self.griddedCount += 1
+ self.griddedMatched += len(foundSatNodes)
+ r["matches"].extend(foundSatNodes)
+
+ def processInSitu(self, records, xyTolerance, timeTolerance):
+ if self.tree is not None:
+ for s in records:
+ self.insituCount += 1
+ u = utm.from_latlon(s["y"], s["x"])
+ coords = np.array([u[0], u[1], 0])
+ ball = self.tree.query_ball_point(coords, xyTolerance)
+
+ self.insituMatches += len(ball)
+
+ for i in ball:
+ match = self.primary[i]
+ if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0):
+ match["matches"].append(s)
+
+ def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"):
+ value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName)
+ return value
+
+ def __checkNumber(self, value):
+ if isinstance(value, float) and (math.isnan(value) or value == np.nan):
+ value = None
+ elif value is not None:
+ value = float(value)
+ return value
+
+ def __buildSwathIndexes(self, chunk):
+ latlons = []
+ utms = []
+ indexes = []
+ for i in range(0, len(chunk.latitudes)):
+ _lat = chunk.latitudes[i]
+ if isinstance(_lat, np.ma.core.MaskedConstant):
+ continue
+ for j in range(0, len(chunk.longitudes)):
+ _lon = chunk.longitudes[j]
+ if isinstance(_lon, np.ma.core.MaskedConstant):
+ continue
+
+ value = self.__getChunkValueAtIndex(chunk, (i, j))
+ if isinstance(value, float) and (math.isnan(value) or value == np.nan):
+ continue
+
+ u = utm.from_latlon(_lat, _lon)
+ v = (u[0], u[1], 0.0)
+ latlons.append((_lat, _lon))
+ utms.append(v)
+ indexes.append((i, j))
+
+ tree = None
+ if len(latlons) > 0:
+ tree = spatial.KDTree(utms)
+
+ chunk.swathIndexing = {
+ "tree": tree,
+ "latlons": latlons,
+ "indexes": indexes
+ }
+
+ def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance):
+ foundIndexes = []
+ foundLatLons = []
+
+ if "swathIndexing" not in chunk.__dict__:
+ self.__buildSwathIndexes(chunk)
+
+ tree = chunk.swathIndexing["tree"]
+ if tree is not None:
+ indexes = chunk.swathIndexing["indexes"]
+ latlons = chunk.swathIndexing["latlons"]
+ u = utm.from_latlon(lat, lon)
+ coords = np.array([u[0], u[1], 0])
+ ball = tree.query_ball_point(coords, xyTolerance)
+ for i in ball:
+ foundIndexes.append(indexes[i])
+ foundLatLons.append(latlons[i])
+ return foundIndexes, foundLatLons
+
+ def __getChunkValueAtIndex(self, chunk, index, arrayName=None):
+
+ if arrayName is None or arrayName == "data":
+ data_val = chunk.data[0][index[0]][index[1]]
+ else:
+ data_val = chunk.meta_data[arrayName][0][index[0]][index[1]]
+ return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan')
+
+ def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance):
+ timeDiff = 86400 * 365 * 1000
+ foundNodes = []
+
+ for ts in chunksByDay:
+ chunks = chunksByDay[ts]
+ if abs((ts * 1000) - searchTime) < timeDiff:
+ for chunk in chunks:
+ indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance)
+
+ # for index in indexes:
+ for i in range(0, len(indexes)):
+ index = indexes[i]
+ latlon = latlons[i]
+ sst = None
+ sss = None
+ windSpeed = None
+ windDirection = None
+ windU = None
+ windV = None
+
+ value = self.__getChunkValueAtIndex(chunk, index)
+
+ if isinstance(value, float) and (math.isnan(value) or value == np.nan):
+ continue
+
+ if "GHRSST" in source:
+ sst = value
+ elif "ASCATB" in source:
+ windU = value
+ elif "SSS" in source: # SMAP
+ sss = value
+
+ if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
+ windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir"))
+ if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
+ windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v"))
+ if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
+ windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed"))
+
+ foundNode = {
+ "sea_water_temperature": sst,
+ "sea_water_salinity": sss,
+ "wind_speed": windSpeed,
+ "wind_direction": windDirection,
+ "wind_u": windU,
+ "wind_v": windV,
+ "time": ts,
+ "x": self.__checkNumber(latlon[1]),
+ "y": self.__checkNumber(latlon[0]),
+ "depth": 0,
+ "sea_water_temperature_depth": 0,
+ "source": source,
+ "id": "%s:%s:%s" % (ts, lat, lon)
+ }
+
+ foundNodes.append(foundNode)
+ timeDiff = abs(ts - searchTime)
+
+ return foundNodes
+
+ def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime):
+
+ timeDiff = 86400 * 365 * 1000
+ foundNodes = []
+
+ for ts in chunksByDay:
+ chunks = chunksByDay[ts]
+ # print chunks
+ # ts = calendar.timegm(chunks.start.utctimetuple()) * 1000
+ if abs((ts * 1000) - searchTime) < timeDiff:
+ value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data")
+ value = self.__checkNumber(value)
+
+ # _Really_ don't like doing it this way...
+
+ sst = None
+ sss = None
+ windSpeed = None
+ windDirection = None
+ windU = None
+ windV = None
+
+ if "GHRSST" in source:
+ sst = value
+
+ if "ASCATB" in source:
+ windU = value
+
+ if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
+ windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir"))
+ if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
+ windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v"))
+ if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
+ windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed"))
+
+ foundNode = {
+ "sea_water_temperature": sst,
+ "sea_water_salinity": sss,
+ "wind_speed": windSpeed,
+ "wind_direction": windDirection,
+ "wind_uv": {
+ "u": windU,
+ "v": windV
+ },
+ "time": ts,
+ "x": lon,
+ "y": lat,
+ "depth": 0,
+ "sea_water_temperature_depth": 0,
+ "source": source,
+ "id": "%s:%s:%s" % (ts, lat, lon)
+ }
+
+ isValidNode = True
+ if "ASCATB" in source and windSpeed is None:
+ isValidNode = None
+
+ if isValidNode:
+ foundNodes.append(foundNode)
+ timeDiff = abs(ts - searchTime)
+
+ return foundNodes
+
+
+class InsituDatasetProcessor:
+ def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance,
+ radiusTolerance):
+ self.primary = primary
+ self.datasource = datasource
+ self.startTime = startTime
+ self.endTime = endTime
+ self.bbox = bbox
+ self.depth_min = depth_min
+ self.depth_max = depth_max
+ self.platforms = platforms
+ self.timeTolerance = timeTolerance
+ self.radiusTolerance = radiusTolerance
+
+ def start(self):
+ def callback(pageData):
+ self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance)
+
+ fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max,
+ self.platforms, pageCallback=callback)
+
+
+class InsituPageProcessor:
+ def __init__(self):
+ pass
diff --git a/analysis/webservice/algorithms/doms/MetadataQuery.py b/analysis/webservice/algorithms/doms/MetadataQuery.py
new file mode 100644
index 0000000..aa24d91
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/MetadataQuery.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import requests
+
+import BaseDomsHandler
+import config
+from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import DatasetNotFoundException
+
+
+@nexus_handler
+class DomsMetadataQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS Metadata Listing"
+ path = "/domsmetadata"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+
+ dataset = computeOptions.get_argument("dataset", None)
+ if dataset is None or len(dataset) == 0:
+ raise Exception("'dataset' parameter not specified")
+
+ metadataUrl = self.__getUrlForDataset(dataset)
+
+ try:
+ r = requests.get(metadataUrl)
+ results = json.loads(r.text)
+ return BaseDomsHandler.DomsQueryResults(results=results)
+ except:
+ raise DatasetNotFoundException("Dataset '%s' not found")
+
+ def __getUrlForDataset(self, dataset):
+ datasetSpec = config.getEndpointByName(dataset)
+ if datasetSpec is not None:
+ return datasetSpec["metadataUrl"]
+ else:
+
+ # KMG: NOT a good hack
+ if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM":
+ dataset = "MUR-JPL-L4-GLOB-v4.1"
+ elif dataset == "SMAP_L2B_SSS":
+ dataset = "JPL_SMAP-SSS_L2_EVAL-V2"
+
+ return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset
diff --git a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
new file mode 100644
index 0000000..1b48d14
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import BaseDomsHandler
+import histogramplot
+import mapplot
+import scatterplot
+from webservice.NexusHandler import nexus_handler
+
+
+class PlotTypes:
+ SCATTER = "scatter"
+ MAP = "map"
+ HISTOGRAM = "histogram"
+
+
+@nexus_handler
+class DomsResultsPlotHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS Results Plotting"
+ path = "/domsplot"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ id = computeOptions.get_argument("id", None)
+ parameter = computeOptions.get_argument('parameter', 'sst')
+
+ plotType = computeOptions.get_argument("type", PlotTypes.SCATTER)
+
+ normAndCurve = computeOptions.get_boolean_arg("normandcurve", False)
+
+ if plotType == PlotTypes.SCATTER:
+ return scatterplot.createScatterPlot(id, parameter)
+ elif plotType == PlotTypes.MAP:
+ return mapplot.createMapPlot(id, parameter)
+ elif plotType == PlotTypes.HISTOGRAM:
+ return histogramplot.createHistogramPlot(id, parameter, normAndCurve)
+ else:
+ raise Exception("Unsupported plot type '%s' specified." % plotType)
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
new file mode 100644
index 0000000..93358e9
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+import BaseDomsHandler
+import ResultsStorage
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS Resultset Retrieval"
+ path = "/domsresults"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ execution_id = computeOptions.get_argument("id", None)
+
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except:
+ raise NexusProcessingException(reason="'id' argument must be a valid uuid", code=400)
+
+ simple_results = computeOptions.get_boolean_arg("simpleResults", default=False)
+
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
+
+ return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
new file mode 100644
index 0000000..03bbd09
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -0,0 +1,286 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+import ConfigParser
+import logging
+import uuid
+from datetime import datetime
+
+import pkg_resources
+from cassandra.cluster import Cluster
+from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
+from cassandra.query import BatchStatement
+from pytz import UTC
+
+
+class AbstractResultsContainer:
+ def __init__(self):
+ self._log = logging.getLogger(__name__)
+ self._log.info("Creating DOMS Results Storage Instance")
+
+ self._session = None
+
+ def __enter__(self):
+ domsconfig = ConfigParser.RawConfigParser()
+ domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini')
+
+ cassHost = domsconfig.get("cassandra", "host")
+ cassKeyspace = domsconfig.get("cassandra", "keyspace")
+ cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
+ cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+
+ dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+ token_policy = TokenAwarePolicy(dc_policy)
+
+ self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
+ protocol_version=cassVersion)
+
+ self._session = self._cluster.connect(cassKeyspace)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._cluster.shutdown()
+
+ def _parseDatetime(self, dtString):
+ dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+ epoch = datetime.utcfromtimestamp(0)
+ time = (dt - epoch).total_seconds() * 1000.0
+ return int(time)
+
+
+class ResultsStorage(AbstractResultsContainer):
+ def __init__(self):
+ AbstractResultsContainer.__init__(self)
+
+ def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
+ if isinstance(execution_id, basestring):
+ execution_id = uuid.UUID(execution_id)
+
+ execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+ self.__insertParams(execution_id, params)
+ self.__insertStats(execution_id, stats)
+ self.__insertResults(execution_id, results)
+ return execution_id
+
+ def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+ if execution_id is None:
+ execution_id = uuid.uuid4()
+
+ cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
+ self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+ return execution_id
+
+ def __insertParams(self, execution_id, params):
+ cql = """INSERT INTO doms_params
+ (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter)
+ VALUES
+ (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ """
+ self._session.execute(cql, (execution_id,
+ params["primary"],
+ ",".join(params["matchup"]) if type(params["matchup"]) == list else params[
+ "matchup"],
+ params["depthMin"] if "depthMin" in params.keys() else None,
+ params["depthMax"] if "depthMax" in params.keys() else None,
+ int(params["timeTolerance"]),
+ params["radiusTolerance"],
+ params["startTime"],
+ params["endTime"],
+ params["platforms"],
+ params["bbox"],
+ params["parameter"]
+ ))
+
+ def __insertStats(self, execution_id, stats):
+ cql = """
+ INSERT INTO doms_execution_stats
+ (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete)
+ VALUES
+ (%s, %s, %s, %s, %s, %s)
+ """
+ self._session.execute(cql, (
+ execution_id,
+ stats["numGriddedMatched"],
+ stats["numGriddedChecked"],
+ stats["numInSituMatched"],
+ stats["numInSituRecords"],
+ stats["timeToComplete"]
+ ))
+
+ def __insertResults(self, execution_id, results):
+
+ cql = """
+ INSERT INTO doms_data
+ (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary)
+ VALUES
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """
+ insertStatement = self._session.prepare(cql)
+ batch = BatchStatement()
+
+ for result in results:
+ self.__insertResult(execution_id, None, result, batch, insertStatement)
+
+ self._session.execute(batch)
+
+ def __insertResult(self, execution_id, primaryId, result, batch, insertStatement):
+
+ dataMap = self.__buildDataMap(result)
+ result_id = uuid.uuid4()
+ batch.add(insertStatement, (
+ result_id,
+ execution_id,
+ result["id"],
+ primaryId,
+ result["x"],
+ result["y"],
+ result["source"],
+ result["time"],
+ result["platform"] if "platform" in result else None,
+ result["device"] if "device" in result else None,
+ dataMap,
+ 1 if primaryId is None else 0
+ )
+ )
+
+ n = 0
+ if "matches" in result:
+ for match in result["matches"]:
+ self.__insertResult(execution_id, result["id"], match, batch, insertStatement)
+ n += 1
+ if n >= 20:
+ if primaryId is None:
+ self.__commitBatch(batch)
+ n = 0
+
+ if primaryId is None:
+ self.__commitBatch(batch)
+
+ def __commitBatch(self, batch):
+ self._session.execute(batch)
+ batch.clear()
+
+ def __buildDataMap(self, result):
+ dataMap = {}
+ for name in result:
+ value = result[name]
+ if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type(
+ value) in [float, int]:
+ dataMap[name] = value
+ return dataMap
+
+
+class ResultsRetrieval(AbstractResultsContainer):
+ def __init__(self):
+ AbstractResultsContainer.__init__(self)
+
+ def retrieveResults(self, execution_id, trim_data=False):
+ if isinstance(execution_id, basestring):
+ execution_id = uuid.UUID(execution_id)
+
+ params = self.__retrieveParams(execution_id)
+ stats = self.__retrieveStats(execution_id)
+ data = self.__retrieveData(execution_id, trim_data=trim_data)
+ return params, stats, data
+
+ def __retrieveData(self, id, trim_data=False):
+ dataMap = self.__retrievePrimaryData(id, trim_data=trim_data)
+ self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data)
+ data = [dataMap[name] for name in dataMap]
+ return data
+
+ def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False):
+ cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false"
+ rows = self._session.execute(cql, (id,))
+
+ for row in rows:
+ entry = self.__rowToDataEntry(row, trim_data=trim_data)
+ if row.primary_value_id in dataMap:
+ if not "matches" in dataMap[row.primary_value_id]:
+ dataMap[row.primary_value_id]["matches"] = []
+ dataMap[row.primary_value_id]["matches"].append(entry)
+ else:
+ print row
+
+ def __retrievePrimaryData(self, id, trim_data=False):
+ cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true"
+ rows = self._session.execute(cql, (id,))
+
+ dataMap = {}
+ for row in rows:
+ entry = self.__rowToDataEntry(row, trim_data=trim_data)
+ dataMap[row.value_id] = entry
+ return dataMap
+
+ def __rowToDataEntry(self, row, trim_data=False):
+ if trim_data:
+ entry = {
+ "x": float(row.x),
+ "y": float(row.y),
+ "source": row.source_dataset,
+ "time": row.measurement_time.replace(tzinfo=UTC)
+ }
+ else:
+ entry = {
+ "id": row.value_id,
+ "x": float(row.x),
+ "y": float(row.y),
+ "source": row.source_dataset,
+ "device": row.device,
+ "platform": row.platform,
+ "time": row.measurement_time.replace(tzinfo=UTC)
+ }
+ for key in row.measurement_values:
+ value = float(row.measurement_values[key])
+ entry[key] = value
+ return entry
+
+ def __retrieveStats(self, id):
+ cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1"
+ rows = self._session.execute(cql, (id,))
+ for row in rows:
+ stats = {
+ "numGriddedMatched": row.num_gridded_matched,
+ "numGriddedChecked": row.num_gridded_checked,
+ "numInSituMatched": row.num_insitu_matched,
+ "numInSituChecked": row.num_insitu_checked,
+ "timeToComplete": row.time_to_complete
+ }
+ return stats
+
+ raise Exception("Execution not found with id '%s'" % id)
+
+ def __retrieveParams(self, id):
+ cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
+ rows = self._session.execute(cql, (id,))
+ for row in rows:
+ params = {
+ "primary": row.primary_dataset,
+ "matchup": row.matchup_datasets.split(","),
+ "depthMin": row.depth_min,
+ "depthMax": row.depth_max,
+ "timeTolerance": row.time_tolerance,
+ "radiusTolerance": row.radius_tolerance,
+ "startTime": row.start_time.replace(tzinfo=UTC),
+ "endTime": row.end_time.replace(tzinfo=UTC),
+ "platforms": row.platforms,
+ "bbox": row.bounding_box,
+ "parameter": row.parameter
+ }
+ return params
+
+ raise Exception("Execution not found with id '%s'" % id)
diff --git a/analysis/webservice/algorithms/doms/StatsQuery.py b/analysis/webservice/algorithms/doms/StatsQuery.py
new file mode 100644
index 0000000..f5ac765
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/StatsQuery.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import BaseDomsHandler
+import datafetch
+from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
+from webservice.NexusHandler import nexus_handler
+
+
+@nexus_handler
+class DomsStatsQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS In-Situ Stats Lookup"
+ path = "/domsstats"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ source = computeOptions.get_argument("source", None)
+ startTime = computeOptions.get_argument("s", None)
+ endTime = computeOptions.get_argument("e", None)
+ bbox = computeOptions.get_argument("b", None)
+ timeTolerance = computeOptions.get_float_arg("tt")
+ depth_min = computeOptions.get_float_arg("depthMin", default=None)
+ depth_max = computeOptions.get_float_arg("depthMax", default=None)
+ radiusTolerance = computeOptions.get_float_arg("rt")
+ platforms = computeOptions.get_argument("platforms", None)
+
+ source1 = self.getDataSourceByName(source)
+ if source1 is None:
+ raise Exception("Source '%s' not found" % source)
+
+ count, bounds = datafetch.getCount(source1, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+ args = {
+ "source": source,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "timeTolerance": timeTolerance,
+ "depthMin": depth_min,
+ "depthMax": depth_max,
+ "radiusTolerance": radiusTolerance,
+ "platforms": platforms
+ }
+
+ return BaseDomsHandler.DomsQueryResults(results={}, args=args, details={}, bounds=bounds, count=count,
+ computeOptions=None)
diff --git a/analysis/webservice/algorithms/doms/ValuesQuery.py b/analysis/webservice/algorithms/doms/ValuesQuery.py
new file mode 100644
index 0000000..d766c7b
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ValuesQuery.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from pytz import timezone
+
+import BaseDomsHandler
+import datafetch
+from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
+from webservice.NexusHandler import nexus_handler
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+
+
+@nexus_handler
+class DomsValuesQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS In-Situ Value Lookup"
+ path = "/domsvalues"
+ description = ""
+ params = {}
+ singleton = True
+
+ def __init__(self):
+ BaseHandler.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ source = computeOptions.get_argument("source", None)
+ startTime = computeOptions.get_start_datetime()
+ endTime = computeOptions.get_end_datetime()
+ bbox = computeOptions.get_argument("b", None)
+ timeTolerance = computeOptions.get_float_arg("tt")
+ depth_min = computeOptions.get_float_arg("depthMin", default=None)
+ depth_max = computeOptions.get_float_arg("depthMax", default=None)
+ radiusTolerance = computeOptions.get_float_arg("rt")
+ platforms = computeOptions.get_argument("platforms", "")
+
+ source1 = self.getDataSourceByName(source)
+ if source1 is None:
+ raise Exception("Source '%s' not found" % source)
+
+ values, bounds = datafetch.getValues(source1, startTime.strftime('%Y-%m-%dT%H:%M:%SZ'),
+ endTime.strftime('%Y-%m-%dT%H:%M:%SZ'), bbox, depth_min, depth_max,
+ platforms, placeholders=True)
+ count = len(values)
+
+ args = {
+ "source": source,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "timeTolerance": timeTolerance,
+ "depthMin": depth_min,
+ "depthMax": depth_max,
+ "radiusTolerance": radiusTolerance,
+ "platforms": platforms
+ }
+
+ return BaseDomsHandler.DomsQueryResults(results=values, args=args, bounds=bounds, details={}, count=count,
+ computeOptions=None)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
new file mode 100644
index 0000000..d5a8e24
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import BaseDomsHandler
+import DatasetListQuery
+import DomsInitialization
+import MatchupQuery
+import MetadataQuery
+import ResultsPlotQuery
+import ResultsRetrieval
+import ResultsStorage
+import StatsQuery
+import ValuesQuery
+import config
+import datafetch
+import fetchedgeimpl
+import geo
+import insitusubset
+import subsetter
+import values
+import workerthread
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
new file mode 100644
index 0000000..ff492e8
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ENDPOINTS = [
+ {
+ "name": "samos",
+ "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json"
+ },
+ {
+ "name": "spurs",
+ "url": "https://doms.jpl.nasa.gov/ws/search/spurs",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json"
+ },
+ {
+ "name": "icoads",
+ "url": "http://rda-data.ucar.edu:8890/ws/search/icoads",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json"
+ },
+ {
+ "name": "spurs2",
+ "url": "https://doms.jpl.nasa.gov/ws/search/spurs2",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json"
+ }
+]
+
+METADATA_LINKS = {
+ "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2",
+ "icoads": "https://rda.ucar.edu/datasets/ds548.1/",
+ "spurs": "https://podaac.jpl.nasa.gov/spurs"
+}
+
+import os
+
+try:
+ env = os.environ['ENV']
+ if env == 'dev':
+ ENDPOINTS = [
+ {
+ "name": "samos",
+ "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json"
+ },
+ {
+ "name": "spurs",
+ "url": "http://127.0.0.1:8890/ws/search/spurs",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json"
+ },
+ {
+ "name": "icoads",
+ "url": "http://rda-data.ucar.edu:8890/ws/search/icoads",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 1000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json"
+ },
+ {
+ "name": "spurs2",
+ "url": "https://doms.jpl.nasa.gov/ws/search/spurs2",
+ "fetchParallel": True,
+ "fetchThreads": 8,
+ "itemsPerPage": 25000,
+ "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json"
+ }
+ ]
+ METADATA_LINKS = {
+ "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2",
+ "icoads": "https://rda.ucar.edu/datasets/ds548.1/",
+ "spurs": "https://podaac.jpl.nasa.gov/spurs"
+ }
+except KeyError:
+ pass
+
+
+def getEndpointByName(name):
+ for endpoint in ENDPOINTS:
+ if endpoint["name"].upper() == name.upper():
+ return endpoint
+ return None
diff --git a/analysis/webservice/algorithms/doms/datafetch.py b/analysis/webservice/algorithms/doms/datafetch.py
new file mode 100644
index 0000000..3fc3917
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/datafetch.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import fetchedgeimpl
+
+
+def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ return fetchedgeimpl.getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+
+def __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ return fetchedgeimpl.fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+
+def __fetchMultipleDataSource(endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ data = []
+ for endpoint in endpoints:
+ dataSingleSource = __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+ data = data + dataSingleSource
+ return data
+
+
+def fetchData(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ if type(endpoint) == list:
+ return __fetchMultipleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+ else:
+ return __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+
+def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
+ return fetchedgeimpl.getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms, placeholders)
+
+
+if __name__ == "__main__":
+ pass
diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini.default b/analysis/webservice/algorithms/doms/domsconfig.ini.default
new file mode 100644
index 0000000..d1814bf
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/domsconfig.ini.default
@@ -0,0 +1,15 @@
+[cassandra]
+host=sdap-cassandra
+port=9042
+keyspace=doms
+local_datacenter=datacenter1
+protocol_version=3
+dc_policy=DCAwareRoundRobinPolicy
+
+
+[cassandraDD]
+host=128.149.115.178,128.149.115.173,128.149.115.176,128.149.115.175,128.149.115.172,128.149.115.174,128.149.115.177
+keyspace=doms
+local_datacenter=B600
+protocol_version=3
+
diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
new file mode 100644
index 0000000..70cf14e
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import traceback
+from datetime import datetime
+from multiprocessing.pool import ThreadPool
+
+import requests
+
+import geo
+import values
+from webservice.webmodel import NexusProcessingException
+
+
+def __parseDatetime(dtString):
+ dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
+ epoch = datetime.utcfromtimestamp(0)
+ time = (dt - epoch).total_seconds() * 1000.0
+ return time
+
+
+def __parseLocation(locString):
+ if "Point" in locString:
+ locString = locString[6:-1]
+
+ if "," in locString:
+ latitude = float(locString.split(",")[0])
+ longitude = float(locString.split(",")[1])
+ else:
+ latitude = float(locString.split(" ")[1])
+ longitude = float(locString.split(" ")[0])
+
+ return (latitude, longitude)
+
+
+def __resultRawToUsable(resultdict):
+ resultdict["time"] = __parseDatetime(resultdict["time"])
+ latitude, longitude = __parseLocation(resultdict["point"])
+
+ resultdict["x"] = longitude
+ resultdict["y"] = latitude
+
+ if "id" not in resultdict and "metadata" in resultdict:
+ resultdict["id"] = resultdict["metadata"]
+
+ resultdict["id"] = "id-%s" % resultdict["id"]
+
+ if "device" in resultdict:
+ resultdict["device"] = values.getDeviceById(resultdict["device"])
+
+ if "platform" in resultdict:
+ resultdict["platform"] = values.getPlatformById(resultdict["platform"])
+
+ if "mission" in resultdict:
+ resultdict["mission"] = values.getMissionById(resultdict["mission"])
+
+ if "sea_surface_temperature" in resultdict:
+ resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"]
+ del resultdict["sea_surface_temperature"]
+
+ return resultdict
+
+
+def __fetchJson(url, params, trycount=1, maxtries=5):
+ if trycount > maxtries:
+ raise Exception("Maximum retries attempted.")
+ if trycount > 1:
+ print "Retry #", trycount
+ r = requests.get(url, params=params, timeout=500.000)
+
+ print r.url
+
+ if r.status_code != 200:
+ return __fetchJson(url, params, trycount + 1, maxtries)
+ try:
+ results = json.loads(r.text)
+ return results
+ except:
+ return __fetchJson(url, params, trycount + 1, maxtries)
+
+
+def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0,
+ platforms=None,
+ pageCallback=None):
+ params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage,
+ "startIndex": startIndex, "stats": "true"}
+
+ if depth_min is not None:
+ params['minDepth'] = depth_min
+ if depth_max is not None:
+ params['maxDepth'] = depth_max
+
+ if platforms is not None:
+ params["platform"] = platforms.split(",")
+
+ resultsRaw = __fetchJson(endpoint["url"], params)
+ boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
+
+ if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check
+ return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer
+
+ try:
+ results = []
+ for resultdict in resultsRaw["results"]:
+ result = __resultRawToUsable(resultdict)
+ result["source"] = endpoint["name"]
+ boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"])
+ results.append(result)
+
+ if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0:
+ stats = resultsRaw["stats_fields"]
+ if "lat" in stats and "lon" in stats:
+ boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'],
+ west=stats['lon']['min'], east=stats['lon']['max'])
+
+ if pageCallback is not None:
+ pageCallback(results)
+
+ '''
+ If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
+ '''
+ if pageCallback is None:
+ return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
+ resultsRaw["itemsPerPage"]), boundsConstrainer
+ else:
+ return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
+ resultsRaw["itemsPerPage"]), boundsConstrainer
+ except:
+ print "Invalid or missing JSON in response."
+ traceback.print_exc()
+ raise NexusProcessingException(reason="Invalid or missing JSON in response.")
+ # return [], 0, startIndex, itemsPerPage, boundsConstrainer
+
+
+def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
+ startIndex = 0
+ pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
+ endTime, bbox,
+ depth_min, depth_max, 0,
+ startIndex, platforms)
+ return totalResults, boundsConstrainer
+
+
+def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None):
+ results = []
+ startIndex = 0
+
+ mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
+
+ # First isn't parellel so we can get the ttl results, forced items per page, etc...
+ pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
+ endTime, bbox,
+ depth_min, depth_max,
+ endpoint["itemsPerPage"],
+ startIndex, platforms,
+ pageCallback)
+ results = results + pageResults
+ mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer)
+
+ pool = ThreadPool(processes=endpoint["fetchThreads"])
+ mpResults = [pool.apply_async(__doQuery, args=(
+ endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in
+ range(len(pageResults), totalResults, itemsPerPageR)]
+ pool.close()
+ pool.join()
+
+ '''
+ If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
+ '''
+ if pageCallback is None:
+ mpResults = [p.get() for p in mpResults]
+ for mpResult in mpResults:
+ results = results + mpResult[0]
+ mainBoundsConstrainer.testOtherConstrainer(mpResult[4])
+
+ return results, mainBoundsConstrainer
+
+
+def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
+ results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
+
+ if placeholders:
+ trimmedResults = []
+ for item in results:
+ depth = None
+ if "depth" in item:
+ depth = item["depth"]
+ if "sea_water_temperature_depth" in item:
+ depth = item["sea_water_temperature_depth"]
+
+ trimmedItem = {
+ "x": item["x"],
+ "y": item["y"],
+ "source": item["source"],
+ "time": item["time"],
+ "device": item["device"] if "device" in item else None,
+ "platform": item["platform"],
+ "depth": depth
+ }
+ trimmedResults.append(trimmedItem)
+
+ results = trimmedResults
+
+ return results, boundsConstrainer
diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py
new file mode 100644
index 0000000..3323f57
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/geo.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+
+MEAN_RADIUS_EARTH_METERS = 6371010.0
+EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0
+POLAR_RADIUS_EARTH_METERS = 6356752.0
+FLATTENING_EARTH = 298.257223563
+MEAN_RADIUS_EARTH_MILES = 3958.8
+
+
+class DistanceUnit(object):
+ METERS = 0
+ MILES = 1
+
+
+# Haversine implementation for great-circle distances between two points
+def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS):
+ if units == DistanceUnit.METERS:
+ R = MEAN_RADIUS_EARTH_METERS
+ elif units == DistanceUnit.MILES:
+ R = MEAN_RADIUS_EARTH_MILES
+ else:
+ raise Exception("Invalid units specified")
+ x0r = x0 * (math.pi / 180.0) # To radians
+ x1r = x1 * (math.pi / 180.0) # To radians
+ xd = (x1 - x0) * (math.pi / 180.0)
+ yd = (y1 - y0) * (math.pi / 180.0)
+
+ a = math.sin(xd / 2.0) * math.sin(xd / 2.0) + \
+ math.cos(x0r) * math.cos(x1r) * \
+ math.sin(yd / 2.0) * math.sin(yd / 2.0)
+ c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
+ d = R * c
+ return d
+
+
+# Equirectangular approximation for when performance is key. Better at smaller distances
+def equirectangularApprox(x0, y0, x1, y1):
+ R = 6371000.0 # Meters
+ x0r = x0 * (math.pi / 180.0) # To radians
+ x1r = x1 * (math.pi / 180.0)
+ y0r = y0 * (math.pi / 180.0)
+ y1r = y1 * (math.pi / 180.0)
+
+ x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0)
+ y = x1r - x0r
+ d = math.sqrt(x * x + y * y) * R
+ return d
+
+
+class BoundingBox(object):
+
+ def __init__(self, north=None, south=None, west=None, east=None, asString=None):
+ if asString is not None:
+ bboxParts = asString.split(",")
+ self.west = float(bboxParts[0])
+ self.south = float(bboxParts[1])
+ self.east = float(bboxParts[2])
+ self.north = float(bboxParts[3])
+ else:
+ self.north = north
+ self.south = south
+ self.west = west
+ self.east = east
+
+ def toString(self):
+ return "%s,%s,%s,%s" % (self.west, self.south, self.east, self.north)
+
+ def toMap(self):
+ return {
+ "xmin": self.west,
+ "xmax": self.east,
+ "ymin": self.south,
+ "ymax": self.north
+ }
+
+
+'''
+ Constrains, does not expand.
+'''
+
+
+class BoundsConstrainer(BoundingBox):
+
+ def __init__(self, north=None, south=None, west=None, east=None, asString=None):
+ BoundingBox.__init__(self, north, south, west, east, asString)
+
+ def testNorth(self, v):
+ if v is None:
+ return
+ self.north = max([self.north, v])
+
+ def testSouth(self, v):
+ if v is None:
+ return
+ self.south = min([self.south, v])
+
+ def testEast(self, v):
+ if v is None:
+ return
+ self.east = max([self.east, v])
+
+ def testWest(self, v):
+ if v is None:
+ return
+ self.west = min([self.west, v])
+
+ def testCoords(self, north=None, south=None, west=None, east=None):
+ self.testNorth(north)
+ self.testSouth(south)
+ self.testWest(west)
+ self.testEast(east)
+
+ def testOtherConstrainer(self, other):
+ self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east)
diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py
new file mode 100644
index 0000000..1e06b66
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/histogramplot.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import string
+from cStringIO import StringIO
+from multiprocessing import Process, Manager
+
+import matplotlib
+import matplotlib.mlab as mlab
+import matplotlib.pyplot as plt
+import numpy as np
+
+import BaseDomsHandler
+import ResultsStorage
+
+if not matplotlib.get_backend():
+ matplotlib.use('Agg')
+
+PARAMETER_TO_FIELD = {
+ "sst": "sea_water_temperature",
+ "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+ "sst": "($^\circ$C)",
+ "sss": "(g/L)"
+}
+
+
+class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+
+ def __init__(self, x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None,
+ computeOptions=None, executionId=None, plot=None):
+ BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds,
+ count=count, computeOptions=computeOptions, executionId=executionId)
+ self.__primary = primary
+ self.__secondary = secondary
+ self.__x = x
+ self.__parameter = parameter
+ self.__plot = plot
+
+ def toImage(self):
+ return self.__plot
+
+
+def render(d, x, primary, secondary, parameter, norm_and_curve=False):
+ fig, ax = plt.subplots()
+ fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold')
+
+ n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75)
+
+ if norm_and_curve:
+ mean = np.mean(x)
+ variance = np.var(x)
+ sigma = np.sqrt(variance)
+ y = mlab.normpdf(bins, mean, sigma)
+ l = plt.plot(bins, y, 'r--', linewidth=1)
+
+ ax.set_title('n = %d' % len(x))
+
+ units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
+ ax.set_xlabel("%s - %s %s" % (primary, secondary, units))
+
+ if norm_and_curve:
+ ax.set_ylabel("Probability per unit difference")
+ else:
+ ax.set_ylabel("Frequency")
+
+ plt.grid(True)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ d['plot'] = sio.getvalue()
+
+
+def renderAsync(x, primary, secondary, parameter, norm_and_curve):
+ manager = Manager()
+ d = manager.dict()
+ p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve))
+ p.start()
+ p.join()
+ return d['plot']
+
+
+def createHistogramPlot(id, parameter, norm_and_curve=False):
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(id)
+
+ primary = params["primary"]
+ secondary = params["matchup"][0]
+
+ x = createHistTable(data, secondary, parameter)
+
+ plot = renderAsync(x, primary, secondary, parameter, norm_and_curve)
+
+ r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary,
+ args=params, details=stats,
+ bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+ return r
+
+
+def createHistTable(results, secondary, parameter):
+ x = []
+
+ field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+ for entry in results:
+ for match in entry["matches"]:
+ if match["source"] == secondary:
+ if field in entry and field in match:
+ a = entry[field]
+ b = match[field]
+ x.append((a - b))
+
+ return x
diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py
new file mode 100644
index 0000000..7f60e99
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/insitusubset.py
@@ -0,0 +1,263 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import StringIO
+import csv
+import json
+import logging
+from datetime import datetime
+
+import requests
+
+import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.webmodel import NexusProcessingException, NoDataException
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS In Situ Subsetter"
+ path = "/domsinsitusubset"
+ description = "Subset a DOMS in situ source given the search domain."
+
+ params = [
+ {
+ "name": "source",
+ "type": "comma-delimited string",
+ "description": "The in situ Dataset to be sub-setted",
+ "required": "true",
+ "sample": "spurs"
+ },
+ {
+ "name": "parameter",
+ "type": "string",
+ "description": "The parameter of interest. One of 'sst', 'sss', 'wind'",
+ "required": "false",
+ "default": "All",
+ "sample": "sss"
+ },
+ {
+ "name": "startTime",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
+ "required": "true",
+ "sample": "2013-10-21T00:00:00Z"
+ },
+ {
+ "name": "endTime",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
+ "required": "true",
+ "sample": "2013-10-31T23:59:59Z"
+ },
+ {
+ "name": "b",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ "required": "true",
+ "sample": "-30,15,-45,30"
+ },
+ {
+ "name": "depthMin",
+ "type": "float",
+ "description": "Minimum depth of measurements. Must be less than depthMax",
+ "required": "false",
+ "default": "No limit",
+ "sample": "0"
+ },
+ {
+ "name": "depthMax",
+ "type": "float",
+ "description": "Maximum depth of measurements. Must be greater than depthMin",
+ "required": "false",
+ "default": "No limit",
+ "sample": "5"
+ },
+ {
+ "name": "platforms",
+ "type": "comma-delimited integer",
+ "description": "Platforms to include for subset consideration",
+ "required": "false",
+ "default": "All",
+ "sample": "1,2,3,4,5,6,7,8,9"
+ },
+ {
+ "name": "output",
+ "type": "string",
+ "description": "Output type. Only 'CSV' or 'JSON' is currently supported",
+ "required": "false",
+ "default": "JSON",
+ "sample": "CSV"
+ }
+ ]
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ source_name = request.get_argument('source', None)
+ if source_name is None or source_name.strip() == '':
+ raise NexusProcessingException(reason="'source' argument is required", code=400)
+
+ parameter_s = request.get_argument('parameter', None)
+ if parameter_s not in ['sst', 'sss', 'wind', None]:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ depth_min = request.get_decimal_arg('depthMin', default=None)
+ depth_max = request.get_decimal_arg('depthMax', default=None)
+
+ if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+ raise NexusProcessingException(
+ reason="Depth Min should be less than Depth Max", code=400)
+
+ platforms = request.get_argument('platforms', None)
+ if platforms is not None:
+ try:
+ p_validation = platforms.split(',')
+ p_validation = [int(p) for p in p_validation]
+ del p_validation
+ except:
+ raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+ return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms
+
+ def calc(self, request, **args):
+
+ source_name, parameter_s, start_time, end_time, bounding_polygon, \
+ depth_min, depth_max, platforms = self.parse_arguments(request)
+
+ with requests.session() as edge_session:
+ edge_results = query_edge(source_name, parameter_s, start_time, end_time,
+ ','.join([str(bound) for bound in bounding_polygon.bounds]),
+ platforms, depth_min, depth_max, edge_session)['results']
+
+ if len(edge_results) == 0:
+ raise NoDataException
+ return InSituSubsetResult(results=edge_results)
+
+
+class InSituSubsetResult(object):
+ def __init__(self, results):
+ self.results = results
+
+ def toJson(self):
+ return json.dumps(self.results, indent=4)
+
+ def toCSV(self):
+ fieldnames = sorted(next(iter(self.results)).keys())
+
+ csv_mem_file = StringIO.StringIO()
+ try:
+ writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames)
+
+ writer.writeheader()
+ writer.writerows(self.results)
+ csv_out = csv_mem_file.getvalue()
+ finally:
+ csv_mem_file.close()
+
+ return csv_out
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000,
+ startIndex=0, stats=True):
+ log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge')
+ try:
+ startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ platform = platform.split(',')
+ except AttributeError:
+ # Assume we were passed a list
+ pass
+
+ params = {"startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "minDepth": depth_min,
+ "maxDepth": depth_max,
+ "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+ if variable:
+ params['variable'] = variable
+ if platform:
+ params['platform'] = platform
+
+ edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+ edge_request.raise_for_status()
+ edge_response = json.loads(edge_request.text)
+
+ # Get all edge results
+ next_page_url = edge_response.get('next', None)
+ while next_page_url is not None:
+ log.debug("requesting %s" % next_page_url)
+ edge_page_request = session.get(next_page_url)
+
+ edge_page_request.raise_for_status()
+ edge_page_response = json.loads(edge_page_request.text)
+
+ edge_response['results'].extend(edge_page_response['results'])
+
+ next_page_url = edge_page_response.get('next', None)
+
+ return edge_response
diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py
new file mode 100644
index 0000000..3af85d3
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/mapplot.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import string
+from cStringIO import StringIO
+from multiprocessing import Process, Manager
+
+import matplotlib
+import matplotlib.pyplot as plt
+import numpy as np
+from mpl_toolkits.basemap import Basemap
+
+import BaseDomsHandler
+import ResultsStorage
+
+if not matplotlib.get_backend():
+ matplotlib.use('Agg')
+
+PARAMETER_TO_FIELD = {
+ "sst": "sea_water_temperature",
+ "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+ "sst": "($^\circ$ C)",
+ "sss": "(g/L)"
+}
+
+
+def __square(minLon, maxLon, minLat, maxLat):
+ if maxLat - minLat > maxLon - minLon:
+ a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0
+ minLon -= a
+ maxLon += a
+ elif maxLon - minLon > maxLat - minLat:
+ a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0
+ minLat -= a
+ maxLat += a
+
+ return minLon, maxLon, minLat, maxLat
+
+
+def render(d, lats, lons, z, primary, secondary, parameter):
+ fig = plt.figure()
+ ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
+
+ ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
+ # ax.set_ylabel('Latitude')
+ # ax.set_xlabel('Longitude')
+
+ minLatA = np.min(lats)
+ maxLatA = np.max(lats)
+ minLonA = np.min(lons)
+ maxLonA = np.max(lons)
+
+ minLat = minLatA - (abs(maxLatA - minLatA) * 0.1)
+ maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1)
+
+ minLon = minLonA - (abs(maxLonA - minLonA) * 0.1)
+ maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1)
+
+ minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat)
+
+ # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l')
+ m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat,
+ resolution='l')
+
+ m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10)
+ m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10)
+
+ m.drawcoastlines()
+ m.drawmapboundary(fill_color='#99ffff')
+ m.fillcontinents(color='#cc9966', lake_color='#99ffff')
+
+ # lats, lons = np.meshgrid(lats, lons)
+
+ masked_array = np.ma.array(z, mask=np.isnan(z))
+ z = masked_array
+
+ values = np.zeros(len(z))
+ for i in range(0, len(z)):
+ values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10
+
+ x, y = m(lons, lats)
+
+ im1 = m.scatter(x, y, values)
+
+ im1.set_array(z)
+ cb = m.colorbar(im1)
+
+ units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
+ cb.set_label("Difference %s" % units)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ plot = sio.getvalue()
+ if d is not None:
+ d['plot'] = plot
+ return plot
+
+
+class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+ def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None,
+ computeOptions=None, executionId=None, plot=None):
+ BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args,
+ details=details, bounds=bounds, count=count,
+ computeOptions=computeOptions, executionId=executionId)
+ self.__lats = lats
+ self.__lons = lons
+ self.__z = np.array(z)
+ self.__parameter = parameter
+ self.__primary = primary
+ self.__secondary = secondary
+ self.__plot = plot
+
+ def toImage(self):
+ return self.__plot
+
+
+def renderAsync(x, y, z, primary, secondary, parameter):
+ manager = Manager()
+ d = manager.dict()
+ p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
+ p.start()
+ p.join()
+ return d['plot']
+
+
+def createMapPlot(id, parameter):
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(id)
+
+ primary = params["primary"]
+ secondary = params["matchup"][0]
+
+ lats = []
+ lons = []
+ z = []
+
+ field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+ for entry in data:
+ for match in entry["matches"]:
+ if match["source"] == secondary:
+
+ if field in entry and field in match:
+ a = entry[field]
+ b = match[field]
+ z.append((a - b))
+ z.append((a - b))
+ else:
+ z.append(1.0)
+ z.append(1.0)
+ lats.append(entry["y"])
+ lons.append(entry["x"])
+ lats.append(match["y"])
+ lons.append(match["x"])
+
+ plot = renderAsync(lats, lons, z, primary, secondary, parameter)
+ r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary,
+ args=params,
+ details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+ return r
diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py
new file mode 100644
index 0000000..2ff57ee
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/scatterplot.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import string
+from cStringIO import StringIO
+from multiprocessing import Process, Manager
+
+import matplotlib
+import matplotlib.pyplot as plt
+
+import BaseDomsHandler
+import ResultsStorage
+
+if not matplotlib.get_backend():
+ matplotlib.use('Agg')
+
+PARAMETER_TO_FIELD = {
+ "sst": "sea_water_temperature",
+ "sss": "sea_water_salinity"
+}
+
+PARAMETER_TO_UNITS = {
+ "sst": "($^\circ$ C)",
+ "sss": "(g/L)"
+}
+
+
+def render(d, x, y, z, primary, secondary, parameter):
+ fig, ax = plt.subplots()
+
+ ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
+
+ units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[
+ "sst"]
+ ax.set_ylabel("%s %s" % (secondary, units))
+ ax.set_xlabel("%s %s" % (primary, units))
+
+ ax.scatter(x, y)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ d['plot'] = sio.getvalue()
+
+
+class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults):
+
+ def __init__(self, x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None,
+ computeOptions=None, executionId=None, plot=None):
+ BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds,
+ count=count, computeOptions=computeOptions, executionId=executionId)
+ self.__primary = primary
+ self.__secondary = secondary
+ self.__x = x
+ self.__y = y
+ self.__z = z
+ self.__parameter = parameter
+ self.__plot = plot
+
+ def toImage(self):
+ return self.__plot
+
+
+def renderAsync(x, y, z, primary, secondary, parameter):
+ manager = Manager()
+ d = manager.dict()
+ p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
+ p.start()
+ p.join()
+ return d['plot']
+
+
+def createScatterPlot(id, parameter):
+ with ResultsStorage.ResultsRetrieval() as storage:
+ params, stats, data = storage.retrieveResults(id)
+
+ primary = params["primary"]
+ secondary = params["matchup"][0]
+
+ x, y, z = createScatterTable(data, secondary, parameter)
+
+ plot = renderAsync(x, y, z, primary, secondary, parameter)
+
+ r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary,
+ args=params, details=stats,
+ bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
+ return r
+
+
+def createScatterTable(results, secondary, parameter):
+ x = []
+ y = []
+ z = []
+
+ field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
+
+ for entry in results:
+ for match in entry["matches"]:
+ if match["source"] == secondary:
+ if field in entry and field in match:
+ a = entry[field]
+ b = match[field]
+ x.append(a)
+ y.append(b)
+ z.append(a - b)
+
+ return x, y, z
diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py
new file mode 100644
index 0000000..67a2276
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -0,0 +1,260 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+import zipfile
+from datetime import datetime
+
+import requests
+
+import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.webmodel import NexusProcessingException
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def is_blank(my_string):
+ return not (my_string and my_string.strip() != '')
+
+
+@nexus_handler
+class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = "DOMS Subsetter"
+ path = "/domssubset"
+ description = "Subset DOMS sources given the search domain"
+
+ params = {
+ "dataset": {
+ "name": "NEXUS Dataset",
+ "type": "string",
+ "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required"
+ },
+ "insitu": {
+ "name": "In Situ sources",
+ "type": "comma-delimited string",
+ "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required"
+ },
+ "parameter": {
+ "name": "Data Parameter",
+ "type": "string",
+ "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "depthMin": {
+ "name": "Minimum Depth",
+ "type": "float",
+ "description": "Minimum depth of measurements. Must be less than depthMax. Optional"
+ },
+ "depthMax": {
+ "name": "Maximum Depth",
+ "type": "float",
+ "description": "Maximum depth of measurements. Must be greater than depthMin. Optional"
+ },
+ "platforms": {
+ "name": "Platforms",
+ "type": "comma-delimited integer",
+ "description": "Platforms to include for subset consideration. Optional"
+ },
+ "output": {
+ "name": "Output",
+ "type": "string",
+ "description": "Output type. Only 'ZIP' is currently supported. Required"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ primary_ds_name = request.get_argument('dataset', None)
+ matchup_ds_names = request.get_argument('insitu', None)
+
+ if is_blank(primary_ds_name) and is_blank(matchup_ds_names):
+ raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required",
+ code=400)
+
+ if matchup_ds_names is not None:
+ try:
+ matchup_ds_names = matchup_ds_names.split(',')
+ except:
+ raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400)
+
+ parameter_s = request.get_argument('parameter', None)
+ if parameter_s not in ['sst', 'sss', 'wind']:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ depth_min = request.get_decimal_arg('depthMin', default=None)
+ depth_max = request.get_decimal_arg('depthMax', default=None)
+
+ if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+ raise NexusProcessingException(
+ reason="Depth Min should be less than Depth Max", code=400)
+
+ platforms = request.get_argument('platforms', None)
+ if platforms is not None:
+ try:
+ p_validation = platforms.split(',')
+ p_validation = [int(p) for p in p_validation]
+ del p_validation
+ except:
+ raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+ return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
+ bounding_polygon, depth_min, depth_max, platforms
+
+ def calc(self, request, **args):
+
+ primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
+ bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request)
+
+ primary_url = "https://doms.jpl.nasa.gov/datainbounds"
+ primary_params = {
+ 'ds': primary_ds_name,
+ 'parameter': parameter_s,
+ 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
+ 'startTime': start_time,
+ 'endTime': end_time,
+ 'output': "CSV"
+ }
+
+ matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset"
+ matchup_params = {
+ 'source': None,
+ 'parameter': parameter_s,
+ 'startTime': start_time,
+ 'endTime': end_time,
+ 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
+ 'depthMin': depth_min,
+ 'depthMax': depth_max,
+ 'platforms': platforms,
+ 'output': 'CSV'
+ }
+
+ primary_temp_file_path = None
+ matchup_downloads = None
+
+ with requests.session() as session:
+
+ if not is_blank(primary_ds_name):
+ # Download primary
+ primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv')
+ download_file(primary_url, primary_temp_file_path, session, params=primary_params)
+
+ if len(matchup_ds_names) > 0:
+ # Download matchup
+ matchup_downloads = {}
+ for matchup_ds in matchup_ds_names:
+ matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv')
+ matchup_params['source'] = matchup_ds
+ download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params)
+
+ # Zip downloads
+ date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"),
+ datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"))
+ bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds
+ zip_dir = tempfile.mkdtemp()
+ zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds)
+ with zipfile.ZipFile(zip_path, 'w') as my_zip:
+ if primary_temp_file_path:
+ my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds))
+ if matchup_downloads:
+ for matchup_ds, download in matchup_downloads.iteritems():
+ my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds))
+
+ # Clean up
+ if primary_temp_file_path:
+ os.remove(primary_temp_file_path)
+ if matchup_downloads:
+ for matchup_ds, download in matchup_downloads.iteritems():
+ os.remove(download[1])
+
+ return SubsetResult(zip_path)
+
+
+class SubsetResult(object):
+ def __init__(self, zip_path):
+ self.zip_path = zip_path
+
+ def toJson(self):
+ raise NotImplementedError
+
+ def toZip(self):
+ with open(self.zip_path, 'rb') as zip_file:
+ zip_contents = zip_file.read()
+
+ return zip_contents
+
+ def cleanup(self):
+ os.remove(self.zip_path)
+
+
+def download_file(url, filepath, session, params=None):
+ r = session.get(url, params=params, stream=True)
+ with open(filepath, 'wb') as f:
+ for chunk in r.iter_content(chunk_size=1024):
+ if chunk: # filter out keep-alive new chunks
+ f.write(chunk)
diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py
new file mode 100644
index 0000000..c47d450
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/values.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+PLATFORMS = [
+ {"id": 1, "desc": "ship"},
+ {"id": 2, "desc": "moored surface buoy"},
+ {"id": 3, "desc": "drifting surface float"},
+ {"id": 4, "desc": "drifting subsurface profiling float"},
+ {"id": 5, "desc": "autonomous underwater vehicle"},
+ {"id": 6, "desc": "offshore structure"},
+ {"id": 7, "desc": "coastal structure"},
+ {"id": 8, "desc": "towed unmanned submersible"},
+ {"id": 9, "desc": "orbiting satellite"}
+]
+
+DEVICES = [
+ {"id": 1, "desc": "bathythermographs"},
+ {"id": 2, "desc": "discrete water samplers"},
+ {"id": 3, "desc": "CTD"},
+ {"id": 4, "desc": "Current profilers / acousticDopplerCurrentProfiler"},
+ {"id": 5, "desc": "radiometers"},
+ {"id": 6, "desc": "scatterometers"}
+]
+
+MISSIONS = [
+ {"id": 1, "desc": "SAMOS"},
+ {"id": 2, "desc": "ICOADS"},
+ {"id": 3, "desc": "Aquarius"},
+ {"id": 4, "desc": "SPURS1"}
+]
+
+
+def getDescById(list, id):
+ for item in list:
+ if item["id"] == id:
+ return item["desc"]
+ return id
+
+
+def getPlatformById(id):
+ return getDescById(PLATFORMS, id)
+
+
+def getDeviceById(id):
+ return getDescById(DEVICES, id)
+
+
+def getMissionById(id):
+ return getDescById(MISSIONS, id)
+
+
+def getDescByListNameAndId(listName, id):
+ if listName.upper() == "PLATFORM":
+ return getPlatformById(id)
+ elif listName.upper() == "DEVICE":
+ return getDeviceById(id)
+ elif listName.upper() == "MISSION":
+ return getMissionById(id)
+ else:
+ raise Exception("Invalid list name specified ('%s')" % listName)
diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py
new file mode 100644
index 0000000..7639c00
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/workerthread.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+
+
+class WorkerThread(threading.Thread):
+
+ def __init__(self, method, params):
+ threading.Thread.__init__(self)
+ self.method = method
+ self.params = params
+ self.completed = False
+ self.results = None
+
+ def run(self):
+ self.results = self.method(*self.params)
+ self.completed = True
+
+
+def __areAllComplete(threads):
+ for thread in threads:
+ if not thread.completed:
+ return False
+
+ return True
+
+
+def wait(threads, startFirst=False, poll=0.5):
+ if startFirst:
+ for thread in threads:
+ thread.start()
+
+ while not __areAllComplete(threads):
+ threading._sleep(poll)
+
+
+def foo(param1, param2):
+ print param1, param2
+ return "c"
+
+
+if __name__ == "__main__":
+
+ thread = WorkerThread(foo, params=("a", "b"))
+ thread.start()
+ while not thread.completed:
+ threading._sleep(0.5)
+ print thread.results