You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/12/22 20:44:36 UTC
(incubator-sdap-nexus) 01/01: insitu DOMS+Parquet support. Pydantic for model normalization
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch SDAP-505
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 3c9aecad8bb18c77404828e2aee600932eba146f
Author: skorper <st...@gmail.com>
AuthorDate: Fri Dec 22 12:44:23 2023 -0800
insitu DOMS+Parquet support. Pydantic for model normalization
---
analysis/tests/algorithms_spark/test_insitu.py | 80 ++++++++++++++
analysis/tests/data/edge_insitu_response.json | 42 ++++++++
analysis/tests/data/parquet_insitu_response.json | 73 +++++++++++++
analysis/webservice/algorithms/doms/config.py | 4 +-
analysis/webservice/algorithms/doms/insitu.py | 132 +++++++++++++++++------
5 files changed, 298 insertions(+), 33 deletions(-)
diff --git a/analysis/tests/algorithms_spark/test_insitu.py b/analysis/tests/algorithms_spark/test_insitu.py
new file mode 100644
index 0000000..b879945
--- /dev/null
+++ b/analysis/tests/algorithms_spark/test_insitu.py
@@ -0,0 +1,80 @@
+from webservice.algorithms.doms import insitu
+import json
+
+
+"""
+From Nga:
+
+https://doms.coaps.fsu.edu/ws/search/samos_cdms?startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&itemsPerPage=1
+
+https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida%20State%20University%2C%20COAPS&project=SAMOS&startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&minDepth=-1000&maxDepth=1000
+"""
+
+
+def test_query_insitu():
+ # https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination?
+ # itemsPerPage=20000
+ # startTime=2017-05-01T01:54:45Z
+ # endTime=2017-05-01T16:41:01Z
+ # bbox=-100.0,20.0,-79.0,30.0
+ # minDepth=-20.0
+ # maxDepth=10.0
+ # provider=Florida+State+University,+COAPS
+ # project=SAMOS
+ # platform=30
+ response = insitu.query_insitu(
+ dataset='SAMOS',
+ variable=None,
+ start_time='2017-05-01T01:54:45Z',
+ end_time='2017-05-01T16:41:01Z',
+ bbox='-100.0,20.0,-79.0,30.0',
+ platform=30,
+ depth_min=-20.0,
+ depth_max=10.0,
+ items_per_page=1000
+ )
+
+ print(f'{response=}')
+ # assert False
+
+def test_query_insitu_doms():
+ # http://doms.coaps.fsu.edu/ws/search/samos?
+ # startTime=2012-08-01T00:00:00Z&
+ # endTime=2013-10-31T23:59:59Z&
+ # bbox=-45,15,-30,30
+ response = insitu.query_insitu(
+ dataset='SAMOS',
+ variable=None,
+ start_time='2017-05-01T01:54:45Z',
+ end_time='2017-05-01T16:41:01Z',
+ bbox='-100.0,20.0,-79.0,30.0',
+ platform=None, # 30 seems to reduce the results to 0
+ depth_min=-20.0,
+ depth_max=10.0,
+ items_per_page=1000
+ )
+
+ print(f'{response=}')
+
+
+def test_insitu_result_serialization():
+ with open('../data/edge_insitu_response.json') as fp:
+ edge_result = json.loads(fp.read())
+
+ with open('../data/parquet_insitu_response.json') as fp:
+ parquet_result = json.loads(fp.read())
+
+ parquet_result_model = insitu.InsituResult(**parquet_result)
+ edge_result_model = insitu.InsituResult(**edge_result)
+
+ assert parquet_result_model.results[0].platform == 30
+ assert edge_result_model.results[0].platform is None
+
+ assert parquet_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0'
+ assert edge_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0'
+
+ assert parquet_result_model.total_results == 4
+ assert edge_result_model.total_results == 4
+
+
+
diff --git a/analysis/tests/data/edge_insitu_response.json b/analysis/tests/data/edge_insitu_response.json
new file mode 100644
index 0000000..7537d78
--- /dev/null
+++ b/analysis/tests/data/edge_insitu_response.json
@@ -0,0 +1,42 @@
+{
+ "last": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=4&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+ "next": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=1&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+ "first": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=0&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z",
+ "results": [
+ {
+ "time": "2018-01-14T17:53:00Z",
+ "latitude": 30,
+ "longitude": -88.43,
+ "depth": -99999,
+ "air_pressure": null,
+ "air_pressure_quality": null,
+ "air_temperature": 4.81,
+ "air_temperature_quality": 1,
+ "dew_point_temperature": null,
+ "dew_point_temperature_quality": null,
+ "downwelling_shortwave_flux_in_air": null,
+ "downwelling_shortwave_flux_in_air_quality": null,
+ "downwelling_longwave_flux_in_air": null,
+ "sea_water_temperature": 17.3,
+ "sea_water_temperature_quality": 1,
+ "eastward_wind": null,
+ "northward_wind": null,
+ "relative_humidity": null,
+ "relative_humidity_quality": null,
+ "sea_water_salinity": 36.02,
+ "sea_water_salinity_quality": 1,
+ "surface_downwelling_photosynthetic_photon_flux_in_air": null,
+ "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null,
+ "wind_speed": null,
+ "wind_speed_quality": null,
+ "wind_component_quality": null,
+ "platform": null,
+ "device": null,
+ "mission": null,
+ "metadata": "WTEO_20180114v30001_0048-99999.0"
+ }
+ ],
+ "totalResults": 4,
+ "startIndex": 0,
+ "itemsPerPage": 1
+}
\ No newline at end of file
diff --git a/analysis/tests/data/parquet_insitu_response.json b/analysis/tests/data/parquet_insitu_response.json
new file mode 100644
index 0000000..0979db7
--- /dev/null
+++ b/analysis/tests/data/parquet_insitu_response.json
@@ -0,0 +1,73 @@
+{
+ "total": 4,
+ "results": [
+ {
+ "time": "2018-01-14T17:53:00Z",
+ "latitude": 30,
+ "longitude": -88.43,
+ "depth": -99999,
+ "air_pressure": null,
+ "air_pressure_quality": null,
+ "air_temperature": 4.81,
+ "air_temperature_quality": 1,
+ "dew_point_temperature": null,
+ "dew_point_temperature_quality": null,
+ "downwelling_longwave_flux_in_air": null,
+ "downwelling_longwave_flux_in_air_quality": null,
+ "downwelling_longwave_radiance_in_air": null,
+ "downwelling_longwave_radiance_in_air_quality": null,
+ "downwelling_shortwave_flux_in_air": null,
+ "downwelling_shortwave_flux_in_air_quality": null,
+ "mass_concentration_of_chlorophyll_in_sea_water": null,
+ "mass_concentration_of_chlorophyll_in_sea_water_quality": null,
+ "rainfall_rate": null,
+ "rainfall_rate_quality": null,
+ "relative_humidity": null,
+ "relative_humidity_quality": null,
+ "sea_surface_salinity": null,
+ "sea_surface_salinity_quality": null,
+ "sea_surface_skin_temperature": null,
+ "sea_surface_skin_temperature_quality": null,
+ "sea_surface_subskin_temperature": null,
+ "sea_surface_subskin_temperature_quality": null,
+ "sea_surface_temperature": null,
+ "sea_surface_temperature_quality": null,
+ "sea_water_density": null,
+ "sea_water_density_quality": null,
+ "sea_water_electrical_conductivity": null,
+ "sea_water_electrical_conductivity_quality": null,
+ "sea_water_practical_salinity": null,
+ "sea_water_practical_salinity_quality": null,
+ "sea_water_salinity": 36.02,
+ "sea_water_salinity_quality": 1,
+ "sea_water_temperature": 17.3,
+ "sea_water_temperature_quality": 1,
+ "surface_downwelling_photosynthetic_photon_flux_in_air": null,
+ "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null,
+ "wet_bulb_temperature": null,
+ "wet_bulb_temperature_quality": null,
+ "wind_speed": null,
+ "wind_speed_quality": null,
+ "wind_from_direction": null,
+ "wind_to_direction": null,
+ "wind_from_direction_quality": null,
+ "wind_to_direction_quality": null,
+ "eastward_wind": null,
+ "northward_wind": null,
+ "wind_component_quality": null,
+ "platform": {
+ "code": "30"
+ },
+ "device": null,
+ "meta": "WTEO_20180114v30001_0048-99999.0",
+ "provider": "Florida State University, COAPS",
+ "project": "SAMOS",
+ "platform_code": "30",
+ "job_id": "47a61aac-6bc2-41a2-9568-920715c65b66"
+ }
+ ],
+ "last": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=3&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+ "first": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+ "next": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=1&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000",
+ "prev": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000"
+}
\ No newline at end of file
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
index 8ccc748..f16c1ed 100644
--- a/analysis/webservice/algorithms/doms/config.py
+++ b/analysis/webservice/algorithms/doms/config.py
@@ -103,7 +103,7 @@ INSITU_PROVIDER_MAP = [
ENDPOINTS = [
{
"name": "samos",
- "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms",
+ "url": "https://doms.coaps.fsu.edu/ws/search/samos",
"fetchParallel": True,
"fetchThreads": 8,
"itemsPerPage": 1000,
@@ -150,7 +150,7 @@ try:
ENDPOINTS = [
{
"name": "samos",
- "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms",
+ "url": "https://doms.coaps.fsu.edu/ws/search/samos",
"fetchParallel": True,
"fetchThreads": 8,
"itemsPerPage": 1000,
diff --git a/analysis/webservice/algorithms/doms/insitu.py b/analysis/webservice/algorithms/doms/insitu.py
index ae35b4a..55cbe1c 100644
--- a/analysis/webservice/algorithms/doms/insitu.py
+++ b/analysis/webservice/algorithms/doms/insitu.py
@@ -20,6 +20,99 @@ import logging
import requests
from datetime import datetime
from webservice.algorithms.doms import config as insitu_endpoints
+from pydantic import BaseModel, Field, AliasChoices, ConfigDict, field_validator
+from typing import List, Optional, Dict, Union
+
+
+class InsituRecord(BaseModel):
+ time: str
+ latitude: float
+ longitude: float
+ depth: float
+ platform: Optional[int] = None
+ device: Optional[float]
+ mission: Optional[str] = None
+ metadata: str = Field(validation_alias=AliasChoices('meta', 'metadata'))
+ provider: Optional[str] = None
+ project: Optional[str] = None
+ platform_code: Optional[str] = None
+ job_id: Optional[str] = None
+
+ model_config = ConfigDict(
+ extra='allow',
+ )
+
+ @field_validator('platform', mode='before')
+ @classmethod
+ def transform(cls, raw_platform: Union[int, Dict[str, str]]) -> Optional[int]:
+ if isinstance(raw_platform, dict):
+ return raw_platform.get('code')
+ return raw_platform
+
+ def get_variables(self):
+ return self.model_extra
+
+
+class InsituResult(BaseModel):
+ last: str = str
+ prev: str = str
+ next: str = str
+ first: str = str
+ total_results: int = Field(validation_alias=AliasChoices('total', 'totalResults'))
+ results: List[InsituRecord]
+
+
+def query_insitu_edge(dataset, params, session=None, stats=True):
+ params.update({
+ 'stats': str(stats).lower(),
+ })
+
+ return query_insitu_api(session, insitu_endpoints.getEndpointByName(dataset)['url'], params) # TODO convert to shared entity
+
+
+def query_insitu_parquet(dataset, params, session=None, **kwargs):
+ provider = insitu_endpoints.get_provider_name(dataset)
+ project = insitu_endpoints.get_project_name(dataset)
+
+ params.update({
+ 'provider': provider,
+ 'project': project
+ })
+
+ return query_insitu_api(session, insitu_endpoints.getEndpoint(provider, dataset), params) # TODO convert to shared entity
+
+
+def query_insitu_api(session, endpoint, params):
+ insitu_response = {}
+
+ # Page through all insitu results
+ next_page_url = endpoint
+ while next_page_url is not None and next_page_url != 'NA':
+ if session is not None:
+ response = session.get(next_page_url, params=params)
+ else:
+ response = requests.get(next_page_url, params=params)
+
+ logging.info(f'Insitu request {response.url}')
+ print(f'Insitu request {response.url}')
+
+ response.raise_for_status()
+ insitu_page_response = response.json()
+
+ if not insitu_response:
+ insitu_response = insitu_page_response
+ else:
+ insitu_response['results'].extend(insitu_page_response['results'])
+
+ next_page_url = insitu_page_response.get('next', None)
+ params = {} # Remove params, they are already included in above URL
+
+ return insitu_response
+
+
+def get_query_insitu_func():
+ # return query_insitu_parquet # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config
+ return query_insitu_edge # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config
def query_insitu_schema():
@@ -35,8 +128,7 @@ def query_insitu_schema():
return response.json()
-def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max,
- items_per_page=20000, session=None):
+def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max, items_per_page=20000, session=None):
"""
Query insitu API, page through results, and aggregate
"""
@@ -52,45 +144,23 @@ def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_
# Assume we were passed a properly formatted string
pass
- provider = insitu_endpoints.get_provider_name(dataset)
- project = insitu_endpoints.get_project_name(dataset)
-
params = {
- 'itemsPerPage': items_per_page,
'startTime': start_time,
'endTime': end_time,
'bbox': bbox,
'minDepth': depth_min,
'maxDepth': depth_max,
- 'provider': provider,
- 'project': project,
'platform': platform,
+ 'itemsPerPage': items_per_page
}
if variable is not None:
params['variable'] = variable
- insitu_response = {}
+ query_insitu_func = get_query_insitu_func()
- # Page through all insitu results
- next_page_url = insitu_endpoints.getEndpoint(provider, dataset)
- while next_page_url is not None and next_page_url != 'NA':
- if session is not None:
- response = session.get(next_page_url, params=params)
- else:
- response = requests.get(next_page_url, params=params)
-
- logging.info(f'Insitu request {response.url}')
-
- response.raise_for_status()
- insitu_page_response = response.json()
-
- if not insitu_response:
- insitu_response = insitu_page_response
- else:
- insitu_response['results'].extend(insitu_page_response['results'])
-
- next_page_url = insitu_page_response.get('next', None)
- params = {} # Remove params, they are already included in above URL
-
- return insitu_response
+ return query_insitu_func(
+ dataset=dataset,
+ params=params,
+ session=session
+ )