You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2022/06/02 05:16:19 UTC

[incubator-sdap-nexus] branch master updated: SDAP-372: Integrate AWS insitu API (#158)

This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d02c8c  SDAP-372: Integrate AWS insitu API (#158)
9d02c8c is described below

commit 9d02c8c218784c7bfd51502a719236b83b4aed2d
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Wed Jun 1 22:16:15 2022 -0700

    SDAP-372: Integrate AWS insitu API (#158)
    
    * Updated doms conf
    
    * Updated matchup to work with aws insitu providers
    
    * Fix platform retrieval code for insitu. New schema is dict
    
    * Added MatchupDoms which queries the DOMS insitu endpoints instead of AWS endpoints
    
    * Get provider name function works for both DOMS and CDMS insitu endpoints
    
    * Fixed platform validation for new CDMS insitu platforms. Expanded CDMS insitu data fields
    
    * Removed unused and unnecessary code
    
    * Removed unnecessary exception
    
    * Added changelogs
    
    * Moved to a single top level changelog
---
 CHANGELOG.md                                       |  15 ++
 analysis/webservice/algorithms/doms/config.py      |  84 +++++++++++
 analysis/webservice/algorithms_spark/Matchup.py    | 133 ++++++++++-------
 .../{Matchup.py => MatchupDoms.py}                 |  12 +-
 analysis/webservice/algorithms_spark/__init__.py   |   1 +
 analysis/webservice/apidocs/openapi.yml            | 161 ++++++++++++++++++++-
 analysis/webservice/webapp.py                      |   4 +-
 7 files changed, 346 insertions(+), 64 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..6daa66d
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,15 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+### Added
+- SDAP-372: Updated matchup algorithm to point to AWS insitu API endpoint
+- SDAP-372: Added new matchup endpoint `matchup_doms` that points to DOMS insitu endpoint
+### Changed
+### Deprecated
+### Removed
+### Fixed
+### Security
\ No newline at end of file
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
index 38c17ec..30d9a56 100644
--- a/analysis/webservice/algorithms/doms/config.py
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -13,6 +13,50 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+INSITU_API_ENDPOINT = 'https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination'
+
+INSITU_PROVIDER_MAP = [
+    {
+        'name': 'NCAR',
+        'projects': [
+            {
+                'name': 'ICOADS Release 3.0',
+                'platforms': ['0', '16', '17', '30', '41', '42']
+            }
+        ]
+    },
+    {
+        'name': 'Florida State University, COAPS',
+        'projects': [
+            {
+                'name': 'SAMOS',
+                'platforms': ['30']
+            }
+        ]
+    },
+    {
+        'name': 'Saildrone',
+        'projects': [
+            {
+                'name': '1021_atlantic',
+                'platforms': ['3B']
+            },
+            {
+                'name': 'antarctic_circumnavigation_2019',
+                'platforms': ['3B']
+            },
+            {
+                'name': 'atlantic_to_med_2019_to_2020',
+                'platforms': ['3B']
+            },
+            {
+                'name': 'shark-2018',
+                'platforms': ['3B']
+            }
+        ]
+    }
+]
+
 ENDPOINTS = [
     {
         "name": "samos",
@@ -103,8 +147,48 @@ except KeyError:
     pass
 
 
+def getEndpoint():
+    return INSITU_API_ENDPOINT
+
+
 def getEndpointByName(name):
     for endpoint in ENDPOINTS:
         if endpoint["name"].upper() == name.upper():
             return endpoint
     return None
+
+def validate_insitu_params(provider_name, project_name, platform_name):
+    """
+    Validate the provided params. The project should be within the
+    given provider and the platform should be appropriate for the
+    given project.
+    """
+    provider = next((provider for provider in INSITU_PROVIDER_MAP
+                     if provider['name'] == provider_name), None)
+
+    if provider is None:
+        return False
+
+    project = next((project for project in provider['projects']
+                    if project_name == project['name']), None)
+
+    if project is None:
+        return False
+
+    return platform_name in project['platforms']
+
+
+def get_provider_name(project_name):
+    provider = next((provider for provider in INSITU_PROVIDER_MAP
+                     if project_name in map(lambda project: project['name'], provider['projects'])), None)
+
+    if provider is not None:
+        return provider['name']
+
+    # Check DOMS endpoints as well. Eventually we should remove this so
+    # only CDMS insitu endpoints are used.
+    provider = next((provider for provider in ENDPOINTS
+                     if provider['name'] == project_name), None)
+    if provider is not None:
+        return provider['name']
+
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 7bc75a6..cf20c5f 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -190,12 +190,6 @@ class Matchup(NexusCalcSparkHandler):
         platforms = request.get_argument('platforms', None)
         if platforms is None:
             raise NexusProcessingException(reason="'platforms' argument is required", code=400)
-        try:
-            p_validation = platforms.split(',')
-            p_validation = [int(p) for p in p_validation]
-            del p_validation
-        except:
-            raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
 
         match_once = request.get_boolean_arg("matchOnce", default=False)
 
@@ -431,14 +425,7 @@ class DomsPoint(object):
     @staticmethod
     def from_edge_point(edge_point):
         point = DomsPoint()
-
-        try:
-            x, y = wkt.loads(edge_point['point']).coords[0]
-        except WKTReadingError:
-            try:
-                x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
-            except ValueError:
-                y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+        x, y = edge_point['longitude'], edge_point['latitude']
 
         point.longitude = x
         point.latitude = y
@@ -450,15 +437,59 @@ class DomsPoint(object):
         point.device = edge_point.get('device')
         point.file_url = edge_point.get('fileurl')
 
+        if 'code' in point.platform:
+            point.platform = edge_point.get('platform')['code']
+
         data_fields = [
+            'air_pressure',
+            'air_pressure_quality',
+            'air_temperature',
+            'air_temperature_quality',
+            'dew_point_temperature',
+            'dew_point_temperature_quality',
+            'downwelling_longwave_flux_in_air',
+            'downwelling_longwave_flux_in_air_quality',
+            'downwelling_longwave_radiance_in_air',
+            'downwelling_longwave_radiance_in_air_quality',
+            'downwelling_shortwave_flux_in_air',
+            'downwelling_shortwave_flux_in_air_quality',
+            'mass_concentration_of_chlorophyll_in_sea_water',
+            'mass_concentration_of_chlorophyll_in_sea_water_quality',
+            'rainfall_rate',
+            'rainfall_rate_quality',
+            'relative_humidity',
+            'relative_humidity_quality',
+            'sea_surface_salinity',
+            'sea_surface_salinity_quality',
+            'sea_surface_skin_temperature',
+            'sea_surface_skin_temperature_quality',
+            'sea_surface_subskin_temperature',
+            'sea_surface_subskin_temperature_quality',
+            'sea_surface_temperature',
+            'sea_surface_temperature_quality',
+            'sea_water_density',
+            'sea_water_density_quality',
+            'sea_water_electrical_conductivity',
+            'sea_water_electrical_conductivity_quality',
+            'sea_water_practical_salinity',
+            'sea_water_practical_salinity_quality',
+            'sea_water_salinity',
+            'sea_water_salinity_quality',
+            'sea_water_temperature',
+            'sea_water_temperature_quality',
+            'surface_downwelling_photosynthetic_photon_flux_in_air',
+            'surface_downwelling_photosynthetic_photon_flux_in_air_quality',
+            'wet_bulb_temperature',
+            'wet_bulb_temperature_quality',
+            'wind_speed',
+            'wind_speed_quality',
+            'wind_from_direction',
+            'wind_from_direction_quality',
+            'wind_to_direction',
+            'wind_to_direction_quality',
             'eastward_wind',
             'northward_wind',
-            'wind_direction',
-            'wind_speed',
-            'sea_water_temperature',
-            'sea_water_temperature_depth',
-            'sea_water_salinity',
-            'sea_water_salinity_depth',
+            'wind_component_quality'
         ]
         data = []
         # This is for in-situ secondary points
@@ -653,7 +684,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
         str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
 
     # Query edge for all points within the spatial-temporal extents of this partition
-    is_insitu_dataset = edge_endpoints.getEndpointByName(secondary_b.value)
+    is_insitu_dataset = edge_endpoints.get_provider_name(secondary_b.value) is not None
 
     if is_insitu_dataset:
         the_time = datetime.now()
@@ -665,7 +696,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
                     [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
                 edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
                                            platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
-                if edge_response['totalResults'] == 0:
+                if edge_response['total'] == 0:
                     continue
                 r = edge_response['results']
                 for p in r:
@@ -679,14 +710,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
         the_time = datetime.now()
         matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
         for n, edge_point in enumerate(edge_results):
-            try:
-                x, y = wkt.loads(edge_point['point']).coords[0]
-            except WKTReadingError:
-                try:
-                    x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
-                except ValueError:
-                    y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
-
+            x, y = edge_point['longitude'], edge_point['latitude']
             matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
     else:
         # Query nexus (cassandra? solr?) to find matching points.
@@ -808,45 +832,44 @@ def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min,
         # Assume we were passed a properly formatted string
         pass
 
-    try:
-        platform = platform.split(',')
-    except AttributeError:
-        # Assume we were passed a list
-        pass
+    provider = edge_endpoints.get_provider_name(dataset)
 
-    params = {"startTime": startTime,
-              "endTime": endTime,
-              "bbox": bbox,
-              "minDepth": depth_min,
-              "maxDepth": depth_max,
-              "platform": platform,
-              "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+    params = {
+        "itemsPerPage": itemsPerPage,
+        "startTime": startTime,
+        "endTime": endTime,
+        "bbox": bbox,
+        "minDepth": depth_min,
+        "maxDepth": depth_max,
+        "provider": provider,
+        "project": dataset,
+        "platform": platform,
+    }
 
     if variable is not None:
         params["variable"] = variable
 
-    dataset_url = edge_endpoints.getEndpointByName(dataset)['url']
-    if session is not None:
-        edge_request = session.get(dataset_url, params=params)
-    else:
-        edge_request = requests.get(dataset_url, params=params)
-
-    edge_request.raise_for_status()
-    edge_response = json.loads(edge_request.text)
+    edge_response = {}
 
     # Get all edge results
-    next_page_url = edge_response.get('next', None)
-    while next_page_url is not None:
+    next_page_url = edge_endpoints.getEndpoint()
+    while next_page_url is not None and next_page_url != 'NA':
+        logging.debug(f'Edge request {next_page_url}')
         if session is not None:
-            edge_page_request = session.get(next_page_url)
+            edge_page_request = session.get(next_page_url, params=params)
         else:
-            edge_page_request = requests.get(next_page_url)
+            edge_page_request = requests.get(next_page_url, params=params)
 
         edge_page_request.raise_for_status()
+
         edge_page_response = json.loads(edge_page_request.text)
 
-        edge_response['results'].extend(edge_page_response['results'])
+        if not edge_response:
+            edge_response = edge_page_response
+        else:
+            edge_response['results'].extend(edge_page_response['results'])
 
         next_page_url = edge_page_response.get('next', None)
+        params = {}  # Remove params, they are already included in above URL
 
     return edge_response
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/MatchupDoms.py
similarity index 99%
copy from analysis/webservice/algorithms_spark/Matchup.py
copy to analysis/webservice/algorithms_spark/MatchupDoms.py
index 7bc75a6..97b3b9e 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/MatchupDoms.py
@@ -52,9 +52,9 @@ def iso_time_to_epoch(str_time):
 
 
 @nexus_handler
-class Matchup(NexusCalcSparkHandler):
-    name = "Matchup"
-    path = "/match_spark"
+class MatchupDoms(NexusCalcSparkHandler):
+    name = "MatchupDoms"
+    path = "/match_spark_doms"
     description = "Match measurements between two or more datasets"
 
     params = {
@@ -272,7 +272,7 @@ class Matchup(NexusCalcSparkHandler):
             "numGriddedMatched": total_keys
         }
 
-        matches = Matchup.convert_to_matches(spark_result)
+        matches = MatchupDoms.convert_to_matches(spark_result)
 
         def do_result_insert():
             with ResultsStorage(self.config) as storage:
@@ -324,7 +324,6 @@ class DataPoint:
     """
     Represents a single point of data. This is used to construct the
     output of the matchup algorithm.
-
     :attribute variable_name: The name of the NetCDF variable.
     :attribute cf_variable_name: The CF standard_name of the
     NetCDF variable. This will be None if the standard_name does not
@@ -364,7 +363,6 @@ class DomsPoint(object):
         the correct device is. This method will only be used for
         satellite measurements, so the only options are 'scatterometers'
         or 'radiometers'
-
         :param variables: List of variable names
         :return: device id integer
         """
@@ -743,7 +741,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
 def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
                                   search_parameter, radius_tolerance, aeqd_proj):
     from nexustiles.model.nexusmodel import NexusPoint
-    from webservice.algorithms_spark.Matchup import DomsPoint  # Must import DomsPoint or Spark complains
+    from webservice.algorithms_spark.MatchupDoms import DomsPoint  # Must import DomsPoint or Spark complains
 
     # Load tile
     try:
diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py
index e93b9eb..b849da9 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -21,6 +21,7 @@ from . import CorrMapSpark
 from . import DailyDifferenceAverageSpark
 from . import HofMoellerSpark
 from . import Matchup
+from . import MatchupDoms
 from . import MaximaMinimaSpark
 from . import NexusCalcSparkHandler
 from . import TimeAvgMapSpark
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 8a7392b..2754463 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -14,7 +14,7 @@ tags:
   - name: Subsetting
     description: Data Subsetting API
 paths:
-  /match_spark:
+  /match_spark_doms:
     get:
       summary: Execute matchup request
       operationId: matchup
@@ -174,6 +174,165 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/Error'
+  /match_spark:
+    get:
+      summary: Execute matchup request
+      operationId: matchup
+      tags:
+        - Matchup
+      parameters:
+        - in: query
+          name: primary
+          description: |
+            The primary dataset used to find matches for. One of the
+            satellite "shortName" as supplied by /domslist endpoint.
+          required: true
+          schema:
+            type: string
+            x-dspopulate:
+              - satellite
+          example: avhrr-l4-glob-v2-daily-ncei-ghrsst-sstblend-avhrr-oi-glob-v020-fv020
+        - in: query
+          name: secondary
+          description: |
+            The dataset(s) being searched for measurements that match
+            the measurements in primary. One or more (comma-separated)
+            of the insitu or satellite "name" as supplied by
+            https://doms.jpl.nasa.gov/domslist
+          required: true
+          schema:
+            type: string
+            x-dspopulate:
+              - satellite
+              - insitu
+          example: icoads
+        - in: query
+          name: startTime
+          description: |
+            Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds
+            since epoch
+          required: true
+          schema:
+            type: string
+            format: date-time
+          example: '2012-09-25T00:00:00Z'
+        - in: query
+          name: endTime
+          description: |
+            Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds
+            since epoch
+          required: true
+          schema:
+            type: string
+            format: date-time
+          example: '2012-09-30T23:59:59Z'
+        - in: query
+          name: b
+          description: |
+            Minimum (Western) Longitude, Minimum (Southern) Latitude,
+            Maximum (Eastern) Longitude, Maximum (Northern) Latitude
+          required: true
+          schema:
+            type: string
+          example: -45,15,-30,30
+        - in: query
+          name: platforms
+          description: Platforms to include for matchup consideration
+          required: true
+          schema:
+            type: string
+          example: 1,2,3,4,5,6,7,8,9
+        - in: query
+          name: depthMin
+          description: |
+            Minimum depth of measurements allowed to be considered for
+            matchup
+          required: false
+          schema:
+            type: integer
+          example: 0
+        - in: query
+          name: depthMax
+          description: |
+            Maximum depth of measurements allowed to be considered for
+            matchup
+          required: false
+          schema:
+            type: integer
+          example: 5
+        - in: query
+          name: tt
+          description: |
+            Tolerance in time (seconds) when comparing two measurements.
+          required: false
+          schema:
+            type: integer
+            default: 86400
+          example: 86400
+        - in: query
+          name: rt
+          description: |
+            Tolerance in radius (meters) when comparing two
+            measurements.
+          required: false
+          schema:
+            type: number
+            default: 1000.0
+          example: 1000.0
+        - in: query
+          name: parameter
+          description: |
+            The parameter of interest used for the match up.
+          required: false
+          schema:
+            type: string
+            default: sst
+          example: sst
+        - in: query
+          name: matchOnce
+          description: |
+            True/False flag used to determine if more than one match
+            per primary point is returned. If true, only the nearest
+            point will be returned for each primary point. If false,
+            all points within the tolerances will be returned for each
+            primary point.
+          required: false
+          schema:
+            type: boolean
+            default: false
+          example: false
+        - in: query
+          name: resultSizeLimit
+          description: |
+            Optional integer value that limits the number of results
+            returned from the matchup. If the number of primary matches
+            is greater than this limit, the service will respond with
+            (HTTP 202 Accepted) and an empty response body. A value of
+            0 means return all results.
+          required: false
+          schema:
+            type: integer
+            default: 500
+          example: 500
+      responses:
+        '200':
+          description: Successful operation
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/MatchupResponse'
+        '400':
+          description: Bad request
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Error'
+        '500':
+          description: Server error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Error'
   /domssubset:
     get:
       summary: Subset DOMS sources given the search domain
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index abb09b8..7588dac 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -119,7 +119,9 @@ if __name__ == "__main__":
                         tile_service_factory=tile_service_factory,
                         sc=spark_context,
                         thread_pool=request_thread_pool)
-            if clazzWrapper == webservice.algorithms_spark.Matchup.Matchup or issubclass(clazzWrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler):
+            if clazzWrapper == webservice.algorithms_spark.Matchup.Matchup \
+                    or clazzWrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms \
+                    or issubclass(clazzWrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler):
                 args['config'] = algorithm_config
 
             handlers.append((clazzWrapper.path,