You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2022/06/02 05:16:19 UTC
[incubator-sdap-nexus] branch master updated: SDAP-372: Integrate AWS insitu API (#158)
This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 9d02c8c SDAP-372: Integrate AWS insitu API (#158)
9d02c8c is described below
commit 9d02c8c218784c7bfd51502a719236b83b4aed2d
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Wed Jun 1 22:16:15 2022 -0700
SDAP-372: Integrate AWS insitu API (#158)
* Updated doms conf
* Updated matchup to work with aws insitu providers
* Fix platform retrieval code for insitu. New schema is dict
* Added MatchupDoms which queries the DOMS insitu endpoints instead of AWS endpoints
* Get provider name function works for both DOMS and CDMS insitu endpoints
* Fixed platform validation for new CDMS insitu platforms. Expanded CDMS insitu data fields
* Removed unused and unnecessary code
* Removed unnecessary exception
* Added changelogs
* Moved to a single top level changelog
---
CHANGELOG.md | 15 ++
analysis/webservice/algorithms/doms/config.py | 84 +++++++++++
analysis/webservice/algorithms_spark/Matchup.py | 133 ++++++++++-------
.../{Matchup.py => MatchupDoms.py} | 12 +-
analysis/webservice/algorithms_spark/__init__.py | 1 +
analysis/webservice/apidocs/openapi.yml | 161 ++++++++++++++++++++-
analysis/webservice/webapp.py | 4 +-
7 files changed, 346 insertions(+), 64 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..6daa66d
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,15 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+### Added
+- SDAP-372: Updated matchup algorithm to point to AWS insitu API endpoint
+- SDAP-372: Added new matchup endpoint `matchup_doms` that points to DOMS insitu endpoint
+### Changed
+### Deprecated
+### Removed
+### Fixed
+### Security
\ No newline at end of file
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
index 38c17ec..30d9a56 100644
--- a/analysis/webservice/algorithms/doms/config.py
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -13,6 +13,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+INSITU_API_ENDPOINT = 'https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination'
+
+INSITU_PROVIDER_MAP = [
+ {
+ 'name': 'NCAR',
+ 'projects': [
+ {
+ 'name': 'ICOADS Release 3.0',
+ 'platforms': ['0', '16', '17', '30', '41', '42']
+ }
+ ]
+ },
+ {
+ 'name': 'Florida State University, COAPS',
+ 'projects': [
+ {
+ 'name': 'SAMOS',
+ 'platforms': ['30']
+ }
+ ]
+ },
+ {
+ 'name': 'Saildrone',
+ 'projects': [
+ {
+ 'name': '1021_atlantic',
+ 'platforms': ['3B']
+ },
+ {
+ 'name': 'antarctic_circumnavigation_2019',
+ 'platforms': ['3B']
+ },
+ {
+ 'name': 'atlantic_to_med_2019_to_2020',
+ 'platforms': ['3B']
+ },
+ {
+ 'name': 'shark-2018',
+ 'platforms': ['3B']
+ }
+ ]
+ }
+]
+
ENDPOINTS = [
{
"name": "samos",
@@ -103,8 +147,48 @@ except KeyError:
pass
+def getEndpoint():
+ return INSITU_API_ENDPOINT
+
+
def getEndpointByName(name):
for endpoint in ENDPOINTS:
if endpoint["name"].upper() == name.upper():
return endpoint
return None
+
+def validate_insitu_params(provider_name, project_name, platform_name):
+ """
+ Validate the provided params. The project should be within the
+ given provider and the platform should be appropriate for the
+ given project.
+ """
+ provider = next((provider for provider in INSITU_PROVIDER_MAP
+ if provider['name'] == provider_name), None)
+
+ if provider is None:
+ return False
+
+ project = next((project for project in provider['projects']
+ if project_name == project['name']), None)
+
+ if project is None:
+ return False
+
+ return platform_name in project['platforms']
+
+
+def get_provider_name(project_name):
+ provider = next((provider for provider in INSITU_PROVIDER_MAP
+ if project_name in map(lambda project: project['name'], provider['projects'])), None)
+
+ if provider is not None:
+ return provider['name']
+
+ # Check DOMS endpoints as well. Eventually we should remove this so
+ # only CDMS insitu endpoints are used.
+ provider = next((provider for provider in ENDPOINTS
+ if provider['name'] == project_name), None)
+ if provider is not None:
+ return provider['name']
+
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 7bc75a6..cf20c5f 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -190,12 +190,6 @@ class Matchup(NexusCalcSparkHandler):
platforms = request.get_argument('platforms', None)
if platforms is None:
raise NexusProcessingException(reason="'platforms' argument is required", code=400)
- try:
- p_validation = platforms.split(',')
- p_validation = [int(p) for p in p_validation]
- del p_validation
- except:
- raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
match_once = request.get_boolean_arg("matchOnce", default=False)
@@ -431,14 +425,7 @@ class DomsPoint(object):
@staticmethod
def from_edge_point(edge_point):
point = DomsPoint()
-
- try:
- x, y = wkt.loads(edge_point['point']).coords[0]
- except WKTReadingError:
- try:
- x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
- except ValueError:
- y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+ x, y = edge_point['longitude'], edge_point['latitude']
point.longitude = x
point.latitude = y
@@ -450,15 +437,59 @@ class DomsPoint(object):
point.device = edge_point.get('device')
point.file_url = edge_point.get('fileurl')
+ if 'code' in point.platform:
+ point.platform = edge_point.get('platform')['code']
+
data_fields = [
+ 'air_pressure',
+ 'air_pressure_quality',
+ 'air_temperature',
+ 'air_temperature_quality',
+ 'dew_point_temperature',
+ 'dew_point_temperature_quality',
+ 'downwelling_longwave_flux_in_air',
+ 'downwelling_longwave_flux_in_air_quality',
+ 'downwelling_longwave_radiance_in_air',
+ 'downwelling_longwave_radiance_in_air_quality',
+ 'downwelling_shortwave_flux_in_air',
+ 'downwelling_shortwave_flux_in_air_quality',
+ 'mass_concentration_of_chlorophyll_in_sea_water',
+ 'mass_concentration_of_chlorophyll_in_sea_water_quality',
+ 'rainfall_rate',
+ 'rainfall_rate_quality',
+ 'relative_humidity',
+ 'relative_humidity_quality',
+ 'sea_surface_salinity',
+ 'sea_surface_salinity_quality',
+ 'sea_surface_skin_temperature',
+ 'sea_surface_skin_temperature_quality',
+ 'sea_surface_subskin_temperature',
+ 'sea_surface_subskin_temperature_quality',
+ 'sea_surface_temperature',
+ 'sea_surface_temperature_quality',
+ 'sea_water_density',
+ 'sea_water_density_quality',
+ 'sea_water_electrical_conductivity',
+ 'sea_water_electrical_conductivity_quality',
+ 'sea_water_practical_salinity',
+ 'sea_water_practical_salinity_quality',
+ 'sea_water_salinity',
+ 'sea_water_salinity_quality',
+ 'sea_water_temperature',
+ 'sea_water_temperature_quality',
+ 'surface_downwelling_photosynthetic_photon_flux_in_air',
+ 'surface_downwelling_photosynthetic_photon_flux_in_air_quality',
+ 'wet_bulb_temperature',
+ 'wet_bulb_temperature_quality',
+ 'wind_speed',
+ 'wind_speed_quality',
+ 'wind_from_direction',
+ 'wind_from_direction_quality',
+ 'wind_to_direction',
+ 'wind_to_direction_quality',
'eastward_wind',
'northward_wind',
- 'wind_direction',
- 'wind_speed',
- 'sea_water_temperature',
- 'sea_water_temperature_depth',
- 'sea_water_salinity',
- 'sea_water_salinity_depth',
+ 'wind_component_quality'
]
data = []
# This is for in-situ secondary points
@@ -653,7 +684,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
# Query edge for all points within the spatial-temporal extents of this partition
- is_insitu_dataset = edge_endpoints.getEndpointByName(secondary_b.value)
+ is_insitu_dataset = edge_endpoints.get_provider_name(secondary_b.value) is not None
if is_insitu_dataset:
the_time = datetime.now()
@@ -665,7 +696,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
[str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
- if edge_response['totalResults'] == 0:
+ if edge_response['total'] == 0:
continue
r = edge_response['results']
for p in r:
@@ -679,14 +710,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
the_time = datetime.now()
matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
for n, edge_point in enumerate(edge_results):
- try:
- x, y = wkt.loads(edge_point['point']).coords[0]
- except WKTReadingError:
- try:
- x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
- except ValueError:
- y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
-
+ x, y = edge_point['longitude'], edge_point['latitude']
matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
else:
# Query nexus (cassandra? solr?) to find matching points.
@@ -808,45 +832,44 @@ def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min,
# Assume we were passed a properly formatted string
pass
- try:
- platform = platform.split(',')
- except AttributeError:
- # Assume we were passed a list
- pass
+ provider = edge_endpoints.get_provider_name(dataset)
- params = {"startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "minDepth": depth_min,
- "maxDepth": depth_max,
- "platform": platform,
- "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+ params = {
+ "itemsPerPage": itemsPerPage,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "minDepth": depth_min,
+ "maxDepth": depth_max,
+ "provider": provider,
+ "project": dataset,
+ "platform": platform,
+ }
if variable is not None:
params["variable"] = variable
- dataset_url = edge_endpoints.getEndpointByName(dataset)['url']
- if session is not None:
- edge_request = session.get(dataset_url, params=params)
- else:
- edge_request = requests.get(dataset_url, params=params)
-
- edge_request.raise_for_status()
- edge_response = json.loads(edge_request.text)
+ edge_response = {}
# Get all edge results
- next_page_url = edge_response.get('next', None)
- while next_page_url is not None:
+ next_page_url = edge_endpoints.getEndpoint()
+ while next_page_url is not None and next_page_url != 'NA':
+ logging.debug(f'Edge request {next_page_url}')
if session is not None:
- edge_page_request = session.get(next_page_url)
+ edge_page_request = session.get(next_page_url, params=params)
else:
- edge_page_request = requests.get(next_page_url)
+ edge_page_request = requests.get(next_page_url, params=params)
edge_page_request.raise_for_status()
+
edge_page_response = json.loads(edge_page_request.text)
- edge_response['results'].extend(edge_page_response['results'])
+ if not edge_response:
+ edge_response = edge_page_response
+ else:
+ edge_response['results'].extend(edge_page_response['results'])
next_page_url = edge_page_response.get('next', None)
+ params = {} # Remove params, they are already included in above URL
return edge_response
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/MatchupDoms.py
similarity index 99%
copy from analysis/webservice/algorithms_spark/Matchup.py
copy to analysis/webservice/algorithms_spark/MatchupDoms.py
index 7bc75a6..97b3b9e 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/MatchupDoms.py
@@ -52,9 +52,9 @@ def iso_time_to_epoch(str_time):
@nexus_handler
-class Matchup(NexusCalcSparkHandler):
- name = "Matchup"
- path = "/match_spark"
+class MatchupDoms(NexusCalcSparkHandler):
+ name = "MatchupDoms"
+ path = "/match_spark_doms"
description = "Match measurements between two or more datasets"
params = {
@@ -272,7 +272,7 @@ class Matchup(NexusCalcSparkHandler):
"numGriddedMatched": total_keys
}
- matches = Matchup.convert_to_matches(spark_result)
+ matches = MatchupDoms.convert_to_matches(spark_result)
def do_result_insert():
with ResultsStorage(self.config) as storage:
@@ -324,7 +324,6 @@ class DataPoint:
"""
Represents a single point of data. This is used to construct the
output of the matchup algorithm.
-
:attribute variable_name: The name of the NetCDF variable.
:attribute cf_variable_name: The CF standard_name of the
NetCDF variable. This will be None if the standard_name does not
@@ -364,7 +363,6 @@ class DomsPoint(object):
the correct device is. This method will only be used for
satellite measurements, so the only options are 'scatterometers'
or 'radiometers'
-
:param variables: List of variable names
:return: device id integer
"""
@@ -743,7 +741,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
search_parameter, radius_tolerance, aeqd_proj):
from nexustiles.model.nexusmodel import NexusPoint
- from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains
+ from webservice.algorithms_spark.MatchupDoms import DomsPoint # Must import DomsPoint or Spark complains
# Load tile
try:
diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py
index e93b9eb..b849da9 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -21,6 +21,7 @@ from . import CorrMapSpark
from . import DailyDifferenceAverageSpark
from . import HofMoellerSpark
from . import Matchup
+from . import MatchupDoms
from . import MaximaMinimaSpark
from . import NexusCalcSparkHandler
from . import TimeAvgMapSpark
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 8a7392b..2754463 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -14,7 +14,7 @@ tags:
- name: Subsetting
description: Data Subsetting API
paths:
- /match_spark:
+ /match_spark_doms:
get:
summary: Execute matchup request
operationId: matchup
@@ -174,6 +174,165 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/Error'
+ /match_spark:
+ get:
+ summary: Execute matchup request
+ operationId: matchup
+ tags:
+ - Matchup
+ parameters:
+ - in: query
+ name: primary
+ description: |
+ The primary dataset used to find matches for. One of the
+ satellite "shortName" as supplied by /domslist endpoint.
+ required: true
+ schema:
+ type: string
+ x-dspopulate:
+ - satellite
+ example: avhrr-l4-glob-v2-daily-ncei-ghrsst-sstblend-avhrr-oi-glob-v020-fv020
+ - in: query
+ name: secondary
+ description: |
+ The dataset(s) being searched for measurements that match
+ the measurements in primary. One or more (comma-separated)
+ of the insitu or satellite "name" as supplied by
+ https://doms.jpl.nasa.gov/domslist
+ required: true
+ schema:
+ type: string
+ x-dspopulate:
+ - satellite
+ - insitu
+ example: icoads
+ - in: query
+ name: startTime
+ description: |
+ Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds
+ since epoch
+ required: true
+ schema:
+ type: string
+ format: date-time
+ example: '2012-09-25T00:00:00Z'
+ - in: query
+ name: endTime
+ description: |
+ Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds
+ since epoch
+ required: true
+ schema:
+ type: string
+ format: date-time
+ example: '2012-09-30T23:59:59Z'
+ - in: query
+ name: b
+ description: |
+ Minimum (Western) Longitude, Minimum (Southern) Latitude,
+ Maximum (Eastern) Longitude, Maximum (Northern) Latitude
+ required: true
+ schema:
+ type: string
+ example: -45,15,-30,30
+ - in: query
+ name: platforms
+ description: Platforms to include for matchup consideration
+ required: true
+ schema:
+ type: string
+ example: 1,2,3,4,5,6,7,8,9
+ - in: query
+ name: depthMin
+ description: |
+ Minimum depth of measurements allowed to be considered for
+ matchup
+ required: false
+ schema:
+ type: integer
+ example: 0
+ - in: query
+ name: depthMax
+ description: |
+ Maximum depth of measurements allowed to be considered for
+ matchup
+ required: false
+ schema:
+ type: integer
+ example: 5
+ - in: query
+ name: tt
+ description: |
+ Tolerance in time (seconds) when comparing two measurements.
+ required: false
+ schema:
+ type: integer
+ default: 86400
+ example: 86400
+ - in: query
+ name: rt
+ description: |
+ Tolerance in radius (meters) when comparing two
+ measurements.
+ required: false
+ schema:
+ type: number
+ default: 1000.0
+ example: 1000.0
+ - in: query
+ name: parameter
+ description: |
+ The parameter of interest used for the match up.
+ required: false
+ schema:
+ type: string
+ default: sst
+ example: sst
+ - in: query
+ name: matchOnce
+ description: |
+ True/False flag used to determine if more than one match
+ per primary point is returned. If true, only the nearest
+ point will be returned for each primary point. If false,
+ all points within the tolerances will be returned for each
+ primary point.
+ required: false
+ schema:
+ type: boolean
+ default: false
+ example: false
+ - in: query
+ name: resultSizeLimit
+ description: |
+ Optional integer value that limits the number of results
+ returned from the matchup. If the number of primary matches
+ is greater than this limit, the service will respond with
+ (HTTP 202 Accepted) and an empty response body. A value of
+ 0 means return all results.
+ required: false
+ schema:
+ type: integer
+ default: 500
+ example: 500
+ responses:
+ '200':
+ description: Successful operation
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/MatchupResponse'
+ '400':
+ description: Bad request
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Error'
+ '500':
+ description: Server error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Error'
/domssubset:
get:
summary: Subset DOMS sources given the search domain
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index abb09b8..7588dac 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -119,7 +119,9 @@ if __name__ == "__main__":
tile_service_factory=tile_service_factory,
sc=spark_context,
thread_pool=request_thread_pool)
- if clazzWrapper == webservice.algorithms_spark.Matchup.Matchup or issubclass(clazzWrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler):
+ if clazzWrapper == webservice.algorithms_spark.Matchup.Matchup \
+ or clazzWrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms \
+ or issubclass(clazzWrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler):
args['config'] = algorithm_config
handlers.append((clazzWrapper.path,