You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/12/22 20:44:35 UTC

(incubator-sdap-nexus) branch SDAP-505 created (now 3c9aeca)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a change to branch SDAP-505
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


      at 3c9aeca  insitu DOMS+Parquet support. Pydantic for model normalization

This branch includes the following new commits:

     new 3c9aeca  insitu DOMS+Parquet support. Pydantic for model normalization

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(incubator-sdap-nexus) 01/01: insitu DOMS+Parquet support. Pydantic for model normalization

Posted by sk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-505
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 3c9aecad8bb18c77404828e2aee600932eba146f
Author: skorper <st...@gmail.com>
AuthorDate: Fri Dec 22 12:44:23 2023 -0800

    insitu DOMS+Parquet support. Pydantic for model normalization
---
 analysis/tests/algorithms_spark/test_insitu.py   |  80 ++++++++++++++
 analysis/tests/data/edge_insitu_response.json    |  42 ++++++++
 analysis/tests/data/parquet_insitu_response.json |  73 +++++++++++++
 analysis/webservice/algorithms/doms/config.py    |   4 +-
 analysis/webservice/algorithms/doms/insitu.py    | 132 +++++++++++++++++------
 5 files changed, 298 insertions(+), 33 deletions(-)

diff --git a/analysis/tests/algorithms_spark/test_insitu.py b/analysis/tests/algorithms_spark/test_insitu.py
new file mode 100644
index 0000000..b879945
--- /dev/null
+++ b/analysis/tests/algorithms_spark/test_insitu.py
@@ -0,0 +1,80 @@
+from webservice.algorithms.doms import insitu
+import json
+
+
+"""
+From Nga: 
+
+https://doms.coaps.fsu.edu/ws/search/samos_cdms?startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&itemsPerPage=1
+
+https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida%20State%20University%2C%20COAPS&project=SAMOS&startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&minDepth=-1000&maxDepth=1000
+"""
+
+
+def test_query_insitu():
+    # https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination?
+    # itemsPerPage=20000
+    # startTime=2017-05-01T01:54:45Z
+    # endTime=2017-05-01T16:41:01Z
+    # bbox=-100.0,20.0,-79.0,30.0
+    # minDepth=-20.0
+    # maxDepth=10.0
+    # provider=Florida+State+University,+COAPS
+    # project=SAMOS
+    # platform=30
+    response = insitu.query_insitu(
+        dataset='SAMOS',
+        variable=None,
+        start_time='2017-05-01T01:54:45Z',
+        end_time='2017-05-01T16:41:01Z',
+        bbox='-100.0,20.0,-79.0,30.0',
+        platform=30,
+        depth_min=-20.0,
+        depth_max=10.0,
+        items_per_page=1000
+    )
+
+    print(f'{response=}')
+    # assert False
+
+def test_query_insitu_doms():
+    # http://doms.coaps.fsu.edu/ws/search/samos?
+    # startTime=2012-08-01T00:00:00Z&
+    # endTime=2013-10-31T23:59:59Z&
+    # bbox=-45,15,-30,30
+    response = insitu.query_insitu(
+        dataset='SAMOS',
+        variable=None,
+        start_time='2017-05-01T01:54:45Z',
+        end_time='2017-05-01T16:41:01Z',
+        bbox='-100.0,20.0,-79.0,30.0',
+        platform=None,  # 30 seems to reduce the results to 0
+        depth_min=-20.0,
+        depth_max=10.0,
+        items_per_page=1000
+    )
+
+    print(f'{response=}')
+
+
+def test_insitu_result_serialization():
+    with open('../data/edge_insitu_response.json') as fp:
+        edge_result = json.loads(fp.read())
+
+    with open('../data/parquet_insitu_response.json') as fp:
+        parquet_result = json.loads(fp.read())
+
+    parquet_result_model = insitu.InsituResult(**parquet_result)
+    edge_result_model = insitu.InsituResult(**edge_result)
+
+    assert parquet_result_model.results[0].platform == 30
+    assert edge_result_model.results[0].platform is None
+
+    assert parquet_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0'
+    assert edge_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0'
+
+    assert parquet_result_model.total_results == 4
+    assert edge_result_model.total_results == 4
+
+
+
diff --git a/analysis/tests/data/edge_insitu_response.json b/analysis/tests/data/edge_insitu_response.json
new file mode 100644
index 0000000..7537d78
--- /dev/null
+++ b/analysis/tests/data/edge_insitu_response.json
@@ -0,0 +1,42 @@
+{
+  "last": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=4&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+  "next": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=1&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+  "first": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=0&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+  "results": [
+    {
+      "time": "2018-01-14T17:53:00Z",
+      "latitude": 30,
+      "longitude": -88.43,
+      "depth": -99999,
+      "air_pressure": null,
+      "air_pressure_quality": null,
+      "air_temperature": 4.81,
+      "air_temperature_quality": 1,
+      "dew_point_temperature": null,
+      "dew_point_temperature_quality": null,
+      "downwelling_shortwave_flux_in_air": null,
+      "downwelling_shortwave_flux_in_air_quality": null,
+      "downwelling_longwave_flux_in_air": null,
+      "sea_water_temperature": 17.3,
+      "sea_water_temperature_quality": 1,
+      "eastward_wind": null,
+      "northward_wind": null,
+      "relative_humidity": null,
+      "relative_humidity_quality": null,
+      "sea_water_salinity": 36.02,
+      "sea_water_salinity_quality": 1,
+      "surface_downwelling_photosynthetic_photon_flux_in_air": null,
+      "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null,
+      "wind_speed": null,
+      "wind_speed_quality": null,
+      "wind_component_quality": null,
+      "platform": null,
+      "device": null,
+      "mission": null,
+      "metadata": "WTEO_20180114v30001_0048-99999.0"
+    }
+  ],
+  "totalResults": 4,
+  "startIndex": 0,
+  "itemsPerPage": 1
+}
\ No newline at end of file
diff --git a/analysis/tests/data/parquet_insitu_response.json b/analysis/tests/data/parquet_insitu_response.json
new file mode 100644
index 0000000..0979db7
--- /dev/null
+++ b/analysis/tests/data/parquet_insitu_response.json
@@ -0,0 +1,73 @@
+{
+  "total": 4,
+  "results": [
+    {
+      "time": "2018-01-14T17:53:00Z",
+      "latitude": 30,
+      "longitude": -88.43,
+      "depth": -99999,
+      "air_pressure": null,
+      "air_pressure_quality": null,
+      "air_temperature": 4.81,
+      "air_temperature_quality": 1,
+      "dew_point_temperature": null,
+      "dew_point_temperature_quality": null,
+      "downwelling_longwave_flux_in_air": null,
+      "downwelling_longwave_flux_in_air_quality": null,
+      "downwelling_longwave_radiance_in_air": null,
+      "downwelling_longwave_radiance_in_air_quality": null,
+      "downwelling_shortwave_flux_in_air": null,
+      "downwelling_shortwave_flux_in_air_quality": null,
+      "mass_concentration_of_chlorophyll_in_sea_water": null,
+      "mass_concentration_of_chlorophyll_in_sea_water_quality": null,
+      "rainfall_rate": null,
+      "rainfall_rate_quality": null,
+      "relative_humidity": null,
+      "relative_humidity_quality": null,
+      "sea_surface_salinity": null,
+      "sea_surface_salinity_quality": null,
+      "sea_surface_skin_temperature": null,
+      "sea_surface_skin_temperature_quality": null,
+      "sea_surface_subskin_temperature": null,
+      "sea_surface_subskin_temperature_quality": null,
+      "sea_surface_temperature": null,
+      "sea_surface_temperature_quality": null,
+      "sea_water_density": null,
+      "sea_water_density_quality": null,
+      "sea_water_electrical_conductivity": null,
+      "sea_water_electrical_conductivity_quality": null,
+      "sea_water_practical_salinity": null,
+      "sea_water_practical_salinity_quality": null,
+      "sea_water_salinity": 36.02,
+      "sea_water_salinity_quality": 1,
+      "sea_water_temperature": 17.3,
+      "sea_water_temperature_quality": 1,
+      "surface_downwelling_photosynthetic_photon_flux_in_air": null,
+      "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null,
+      "wet_bulb_temperature": null,
+      "wet_bulb_temperature_quality": null,
+      "wind_speed": null,
+      "wind_speed_quality": null,
+      "wind_from_direction": null,
+      "wind_to_direction": null,
+      "wind_from_direction_quality": null,
+      "wind_to_direction_quality": null,
+      "eastward_wind": null,
+      "northward_wind": null,
+      "wind_component_quality": null,
+      "platform": {
+        "code": "30"
+      },
+      "device": null,
+      "meta": "WTEO_20180114v30001_0048-99999.0",
+      "provider": "Florida State University, COAPS",
+      "project": "SAMOS",
+      "platform_code": "30",
+      "job_id": "47a61aac-6bc2-41a2-9568-920715c65b66"
+    }
+  ],
+  "last": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=3&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+  "first": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+  "next": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=1&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+  "prev": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000"
+}
\ No newline at end of file
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
index 8ccc748..f16c1ed 100644
--- a/analysis/webservice/algorithms/doms/config.py
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -103,7 +103,7 @@ INSITU_PROVIDER_MAP = [
 ENDPOINTS = [
     {
         "name": "samos",
-        "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms",
+        "url": "https://doms.coaps.fsu.edu/ws/search/samos",
         "fetchParallel": True,
         "fetchThreads": 8,
         "itemsPerPage": 1000,
@@ -150,7 +150,7 @@ try:
         ENDPOINTS = [
             {
                 "name": "samos",
-                "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms",
+                "url": "https://doms.coaps.fsu.edu/ws/search/samos",
                 "fetchParallel": True,
                 "fetchThreads": 8,
                 "itemsPerPage": 1000,
diff --git a/analysis/webservice/algorithms/doms/insitu.py b/analysis/webservice/algorithms/doms/insitu.py
index ae35b4a..55cbe1c 100644
--- a/analysis/webservice/algorithms/doms/insitu.py
+++ b/analysis/webservice/algorithms/doms/insitu.py
@@ -20,6 +20,99 @@ import logging
 import requests
 from datetime import datetime
 from webservice.algorithms.doms import config as insitu_endpoints
+from pydantic import BaseModel, Field, AliasChoices, ConfigDict, field_validator
+from typing import List, Optional, Dict, Union
+
+
+class InsituRecord(BaseModel):
+    time: str
+    latitude: float
+    longitude: float
+    depth: float
+    platform: Optional[int] = None
+    device: Optional[float]
+    mission: Optional[str] = None
+    metadata: str = Field(validation_alias=AliasChoices('meta', 'metadata'))
+    provider: Optional[str] = None
+    project: Optional[str] = None
+    platform_code: Optional[str] = None
+    job_id: Optional[str] = None
+
+    model_config = ConfigDict(
+        extra='allow',
+    )
+
+    @field_validator('platform', mode='before')
+    @classmethod
+    def transform(cls, raw_platform: Union[int, Dict[str, str]]) -> Optional[int]:
+        if isinstance(raw_platform, dict):
+            return raw_platform.get('code')
+        return raw_platform
+
+    def get_variables(self):
+        return self.model_extra
+
+
+class InsituResult(BaseModel):
+    last: str = str
+    prev: str = str
+    next: str = str
+    first: str = str
+    total_results: int = Field(validation_alias=AliasChoices('total', 'totalResults'))
+    results: List[InsituRecord]
+
+
+def query_insitu_edge(dataset, params, session=None, stats=True):
+    params.update({
+        'stats': str(stats).lower(),
+    })
+
+    return query_insitu_api(session, insitu_endpoints.getEndpointByName(dataset)['url'], params)  # TODO convert to shared entity
+
+
+def query_insitu_parquet(dataset, params, session=None, **kwargs):
+    provider = insitu_endpoints.get_provider_name(dataset)
+    project = insitu_endpoints.get_project_name(dataset)
+
+    params.update({
+        'provider': provider,
+        'project': project
+    })
+
+    return query_insitu_api(session, insitu_endpoints.getEndpoint(provider, dataset), params)  # TODO convert to shared entity
+
+
+def query_insitu_api(session, endpoint, params):
+    insitu_response = {}
+
+    # Page through all insitu results
+    next_page_url = endpoint
+    while next_page_url is not None and next_page_url != 'NA':
+        if session is not None:
+            response = session.get(next_page_url, params=params)
+        else:
+            response = requests.get(next_page_url, params=params)
+
+        logging.info(f'Insitu request {response.url}')
+        print(f'Insitu request {response.url}')
+
+        response.raise_for_status()
+        insitu_page_response = response.json()
+
+        if not insitu_response:
+            insitu_response = insitu_page_response
+        else:
+            insitu_response['results'].extend(insitu_page_response['results'])
+
+        next_page_url = insitu_page_response.get('next', None)
+        params = {}  # Remove params, they are already included in above URL
+
+    return insitu_response
+
+
+def get_query_insitu_func():
+    # return query_insitu_parquet  # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config
+    return query_insitu_edge  # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config
 
 
 def query_insitu_schema():
@@ -35,8 +128,7 @@ def query_insitu_schema():
     return response.json()
 
 
-def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max,
-               items_per_page=20000, session=None):
+def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max, items_per_page=20000, session=None):
     """
     Query insitu API, page through results, and aggregate
     """
@@ -52,45 +144,23 @@ def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_
         # Assume we were passed a properly formatted string
         pass
 
-    provider = insitu_endpoints.get_provider_name(dataset)
-    project = insitu_endpoints.get_project_name(dataset)
-
     params = {
-        'itemsPerPage': items_per_page,
         'startTime': start_time,
         'endTime': end_time,
         'bbox': bbox,
         'minDepth': depth_min,
         'maxDepth': depth_max,
-        'provider': provider,
-        'project': project,
         'platform': platform,
+        'itemsPerPage': items_per_page
     }
 
     if variable is not None:
         params['variable'] = variable
 
-    insitu_response = {}
+    query_insitu_func = get_query_insitu_func()
 
-    # Page through all insitu results
-    next_page_url = insitu_endpoints.getEndpoint(provider, dataset)
-    while next_page_url is not None and next_page_url != 'NA':
-        if session is not None:
-            response = session.get(next_page_url, params=params)
-        else:
-            response = requests.get(next_page_url, params=params)
-
-        logging.info(f'Insitu request {response.url}')
-
-        response.raise_for_status()
-        insitu_page_response = response.json()
-
-        if not insitu_response:
-            insitu_response = insitu_page_response
-        else:
-            insitu_response['results'].extend(insitu_page_response['results'])
-
-        next_page_url = insitu_page_response.get('next', None)
-        params = {}  # Remove params, they are already included in above URL
-
-    return insitu_response
+    return query_insitu_func(
+        dataset=dataset,
+        params=params,
+        session=session
+    )