You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/12/22 20:44:36 UTC

(incubator-sdap-nexus) 01/01: insitu DOMS+Parquet support. Pydantic for model normalization

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-505
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 3c9aecad8bb18c77404828e2aee600932eba146f
Author: skorper <st...@gmail.com>
AuthorDate: Fri Dec 22 12:44:23 2023 -0800

    insitu DOMS+Parquet support. Pydantic for model normalization
---
 analysis/tests/algorithms_spark/test_insitu.py   |  80 ++++++++++++++
 analysis/tests/data/edge_insitu_response.json    |  42 ++++++++
 analysis/tests/data/parquet_insitu_response.json |  73 +++++++++++++
 analysis/webservice/algorithms/doms/config.py    |   4 +-
 analysis/webservice/algorithms/doms/insitu.py    | 132 +++++++++++++++++------
 5 files changed, 298 insertions(+), 33 deletions(-)

diff --git a/analysis/tests/algorithms_spark/test_insitu.py b/analysis/tests/algorithms_spark/test_insitu.py
new file mode 100644
index 0000000..b879945
--- /dev/null
+++ b/analysis/tests/algorithms_spark/test_insitu.py
@@ -0,0 +1,80 @@
+from webservice.algorithms.doms import insitu
+import json
+
+
+"""
+From Nga: 
+
+https://doms.coaps.fsu.edu/ws/search/samos_cdms?startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&itemsPerPage=1
+
+https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida%20State%20University%2C%20COAPS&project=SAMOS&startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&minDepth=-1000&maxDepth=1000
+"""
+
+
+def test_query_insitu():
+    # https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination?
+    # itemsPerPage=20000
+    # startTime=2017-05-01T01:54:45Z
+    # endTime=2017-05-01T16:41:01Z
+    # bbox=-100.0,20.0,-79.0,30.0
+    # minDepth=-20.0
+    # maxDepth=10.0
+    # provider=Florida+State+University,+COAPS
+    # project=SAMOS
+    # platform=30
+    response = insitu.query_insitu(
+        dataset='SAMOS',
+        variable=None,
+        start_time='2017-05-01T01:54:45Z',
+        end_time='2017-05-01T16:41:01Z',
+        bbox='-100.0,20.0,-79.0,30.0',
+        platform=30,
+        depth_min=-20.0,
+        depth_max=10.0,
+        items_per_page=1000
+    )
+
+    print(f'{response=}')
+    # assert False
+
+def test_query_insitu_doms():
+    # http://doms.coaps.fsu.edu/ws/search/samos?
+    # startTime=2012-08-01T00:00:00Z&
+    # endTime=2013-10-31T23:59:59Z&
+    # bbox=-45,15,-30,30
+    response = insitu.query_insitu(
+        dataset='SAMOS',
+        variable=None,
+        start_time='2017-05-01T01:54:45Z',
+        end_time='2017-05-01T16:41:01Z',
+        bbox='-100.0,20.0,-79.0,30.0',
+        platform=None,  # 30 seems to reduce the results to 0
+        depth_min=-20.0,
+        depth_max=10.0,
+        items_per_page=1000
+    )
+
+    print(f'{response=}')
+
+
+def test_insitu_result_serialization():
+    with open('../data/edge_insitu_response.json') as fp:
+        edge_result = json.loads(fp.read())
+
+    with open('../data/parquet_insitu_response.json') as fp:
+        parquet_result = json.loads(fp.read())
+
+    parquet_result_model = insitu.InsituResult(**parquet_result)
+    edge_result_model = insitu.InsituResult(**edge_result)
+
+    assert parquet_result_model.results[0].platform == 30
+    assert edge_result_model.results[0].platform is None
+
+    assert parquet_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0'
+    assert edge_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0'
+
+    assert parquet_result_model.total_results == 4
+    assert edge_result_model.total_results == 4
+
+
+
diff --git a/analysis/tests/data/edge_insitu_response.json b/analysis/tests/data/edge_insitu_response.json
new file mode 100644
index 0000000..7537d78
--- /dev/null
+++ b/analysis/tests/data/edge_insitu_response.json
@@ -0,0 +1,42 @@
+{
+  "last": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=4&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+  "next": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=1&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+  "first": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=0&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+  "results": [
+    {
+      "time": "2018-01-14T17:53:00Z",
+      "latitude": 30,
+      "longitude": -88.43,
+      "depth": -99999,
+      "air_pressure": null,
+      "air_pressure_quality": null,
+      "air_temperature": 4.81,
+      "air_temperature_quality": 1,
+      "dew_point_temperature": null,
+      "dew_point_temperature_quality": null,
+      "downwelling_shortwave_flux_in_air": null,
+      "downwelling_shortwave_flux_in_air_quality": null,
+      "downwelling_longwave_flux_in_air": null,
+      "sea_water_temperature": 17.3,
+      "sea_water_temperature_quality": 1,
+      "eastward_wind": null,
+      "northward_wind": null,
+      "relative_humidity": null,
+      "relative_humidity_quality": null,
+      "sea_water_salinity": 36.02,
+      "sea_water_salinity_quality": 1,
+      "surface_downwelling_photosynthetic_photon_flux_in_air": null,
+      "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null,
+      "wind_speed": null,
+      "wind_speed_quality": null,
+      "wind_component_quality": null,
+      "platform": null,
+      "device": null,
+      "mission": null,
+      "metadata": "WTEO_20180114v30001_0048-99999.0"
+    }
+  ],
+  "totalResults": 4,
+  "startIndex": 0,
+  "itemsPerPage": 1
+}
\ No newline at end of file
diff --git a/analysis/tests/data/parquet_insitu_response.json b/analysis/tests/data/parquet_insitu_response.json
new file mode 100644
index 0000000..0979db7
--- /dev/null
+++ b/analysis/tests/data/parquet_insitu_response.json
@@ -0,0 +1,73 @@
+{
+  "total": 4,
+  "results": [
+    {
+      "time": "2018-01-14T17:53:00Z",
+      "latitude": 30,
+      "longitude": -88.43,
+      "depth": -99999,
+      "air_pressure": null,
+      "air_pressure_quality": null,
+      "air_temperature": 4.81,
+      "air_temperature_quality": 1,
+      "dew_point_temperature": null,
+      "dew_point_temperature_quality": null,
+      "downwelling_longwave_flux_in_air": null,
+      "downwelling_longwave_flux_in_air_quality": null,
+      "downwelling_longwave_radiance_in_air": null,
+      "downwelling_longwave_radiance_in_air_quality": null,
+      "downwelling_shortwave_flux_in_air": null,
+      "downwelling_shortwave_flux_in_air_quality": null,
+      "mass_concentration_of_chlorophyll_in_sea_water": null,
+      "mass_concentration_of_chlorophyll_in_sea_water_quality": null,
+      "rainfall_rate": null,
+      "rainfall_rate_quality": null,
+      "relative_humidity": null,
+      "relative_humidity_quality": null,
+      "sea_surface_salinity": null,
+      "sea_surface_salinity_quality": null,
+      "sea_surface_skin_temperature": null,
+      "sea_surface_skin_temperature_quality": null,
+      "sea_surface_subskin_temperature": null,
+      "sea_surface_subskin_temperature_quality": null,
+      "sea_surface_temperature": null,
+      "sea_surface_temperature_quality": null,
+      "sea_water_density": null,
+      "sea_water_density_quality": null,
+      "sea_water_electrical_conductivity": null,
+      "sea_water_electrical_conductivity_quality": null,
+      "sea_water_practical_salinity": null,
+      "sea_water_practical_salinity_quality": null,
+      "sea_water_salinity": 36.02,
+      "sea_water_salinity_quality": 1,
+      "sea_water_temperature": 17.3,
+      "sea_water_temperature_quality": 1,
+      "surface_downwelling_photosynthetic_photon_flux_in_air": null,
+      "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null,
+      "wet_bulb_temperature": null,
+      "wet_bulb_temperature_quality": null,
+      "wind_speed": null,
+      "wind_speed_quality": null,
+      "wind_from_direction": null,
+      "wind_to_direction": null,
+      "wind_from_direction_quality": null,
+      "wind_to_direction_quality": null,
+      "eastward_wind": null,
+      "northward_wind": null,
+      "wind_component_quality": null,
+      "platform": {
+        "code": "30"
+      },
+      "device": null,
+      "meta": "WTEO_20180114v30001_0048-99999.0",
+      "provider": "Florida State University, COAPS",
+      "project": "SAMOS",
+      "platform_code": "30",
+      "job_id": "47a61aac-6bc2-41a2-9568-920715c65b66"
+    }
+  ],
+  "last": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=3&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+  "first": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+  "next": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=1&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+  "prev": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000"
+}
\ No newline at end of file
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
index 8ccc748..f16c1ed 100644
--- a/analysis/webservice/algorithms/doms/config.py
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -103,7 +103,7 @@ INSITU_PROVIDER_MAP = [
 ENDPOINTS = [
     {
         "name": "samos",
-        "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms",
+        "url": "https://doms.coaps.fsu.edu/ws/search/samos",
         "fetchParallel": True,
         "fetchThreads": 8,
         "itemsPerPage": 1000,
@@ -150,7 +150,7 @@ try:
         ENDPOINTS = [
             {
                 "name": "samos",
-                "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms",
+                "url": "https://doms.coaps.fsu.edu/ws/search/samos",
                 "fetchParallel": True,
                 "fetchThreads": 8,
                 "itemsPerPage": 1000,
diff --git a/analysis/webservice/algorithms/doms/insitu.py b/analysis/webservice/algorithms/doms/insitu.py
index ae35b4a..55cbe1c 100644
--- a/analysis/webservice/algorithms/doms/insitu.py
+++ b/analysis/webservice/algorithms/doms/insitu.py
@@ -20,6 +20,99 @@ import logging
 import requests
 from datetime import datetime
 from webservice.algorithms.doms import config as insitu_endpoints
+from pydantic import BaseModel, Field, AliasChoices, ConfigDict, field_validator
+from typing import List, Optional, Dict, Union
+
+
+class InsituRecord(BaseModel):
+    time: str
+    latitude: float
+    longitude: float
+    depth: float
+    platform: Optional[int] = None
+    device: Optional[float]
+    mission: Optional[str] = None
+    metadata: str = Field(validation_alias=AliasChoices('meta', 'metadata'))
+    provider: Optional[str] = None
+    project: Optional[str] = None
+    platform_code: Optional[str] = None
+    job_id: Optional[str] = None
+
+    model_config = ConfigDict(
+        extra='allow',
+    )
+
+    @field_validator('platform', mode='before')
+    @classmethod
+    def transform(cls, raw_platform: Union[int, Dict[str, str]]) -> Optional[int]:
+        if isinstance(raw_platform, dict):
+            return raw_platform.get('code')
+        return raw_platform
+
+    def get_variables(self):
+        return self.model_extra
+
+
+class InsituResult(BaseModel):
+    last: str = str
+    prev: str = str
+    next: str = str
+    first: str = str
+    total_results: int = Field(validation_alias=AliasChoices('total', 'totalResults'))
+    results: List[InsituRecord]
+
+
+def query_insitu_edge(dataset, params, session=None, stats=True):
+    params.update({
+        'stats': str(stats).lower(),
+    })
+
+    return query_insitu_api(session, insitu_endpoints.getEndpointByName(dataset)['url'], params)  # TODO convert to shared entity
+
+
+def query_insitu_parquet(dataset, params, session=None, **kwargs):
+    provider = insitu_endpoints.get_provider_name(dataset)
+    project = insitu_endpoints.get_project_name(dataset)
+
+    params.update({
+        'provider': provider,
+        'project': project
+    })
+
+    return query_insitu_api(session, insitu_endpoints.getEndpoint(provider, dataset), params)  # TODO convert to shared entity
+
+
+def query_insitu_api(session, endpoint, params):
+    insitu_response = {}
+
+    # Page through all insitu results
+    next_page_url = endpoint
+    while next_page_url is not None and next_page_url != 'NA':
+        if session is not None:
+            response = session.get(next_page_url, params=params)
+        else:
+            response = requests.get(next_page_url, params=params)
+
+        logging.info(f'Insitu request {response.url}')
+        print(f'Insitu request {response.url}')
+
+        response.raise_for_status()
+        insitu_page_response = response.json()
+
+        if not insitu_response:
+            insitu_response = insitu_page_response
+        else:
+            insitu_response['results'].extend(insitu_page_response['results'])
+
+        next_page_url = insitu_page_response.get('next', None)
+        params = {}  # Remove params, they are already included in above URL
+
+    return insitu_response
+
+
+def get_query_insitu_func():
+    # return query_insitu_parquet  # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config
+    return query_insitu_edge  # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config
 
 
 def query_insitu_schema():
@@ -35,8 +128,7 @@ def query_insitu_schema():
     return response.json()
 
 
-def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max,
-               items_per_page=20000, session=None):
+def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max, items_per_page=20000, session=None):
     """
     Query insitu API, page through results, and aggregate
     """
@@ -52,45 +144,23 @@ def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_
         # Assume we were passed a properly formatted string
         pass
 
-    provider = insitu_endpoints.get_provider_name(dataset)
-    project = insitu_endpoints.get_project_name(dataset)
-
     params = {
-        'itemsPerPage': items_per_page,
         'startTime': start_time,
         'endTime': end_time,
         'bbox': bbox,
         'minDepth': depth_min,
         'maxDepth': depth_max,
-        'provider': provider,
-        'project': project,
         'platform': platform,
+        'itemsPerPage': items_per_page
     }
 
     if variable is not None:
         params['variable'] = variable
 
-    insitu_response = {}
+    query_insitu_func = get_query_insitu_func()
 
-    # Page through all insitu results
-    next_page_url = insitu_endpoints.getEndpoint(provider, dataset)
-    while next_page_url is not None and next_page_url != 'NA':
-        if session is not None:
-            response = session.get(next_page_url, params=params)
-        else:
-            response = requests.get(next_page_url, params=params)
-
-        logging.info(f'Insitu request {response.url}')
-
-        response.raise_for_status()
-        insitu_page_response = response.json()
-
-        if not insitu_response:
-            insitu_response = insitu_page_response
-        else:
-            insitu_response['results'].extend(insitu_page_response['results'])
-
-        next_page_url = insitu_page_response.get('next', None)
-        params = {}  # Remove params, they are already included in above URL
-
-    return insitu_response
+    return query_insitu_func(
+        dataset=dataset,
+        params=params,
+        session=session
+    )