You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2024/01/05 23:34:01 UTC
(incubator-sdap-nexus) 01/01: stac catalog
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch SDAP-506
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 32ca3d709237d324decada84fe92a4a4044b6521
Author: skorper <st...@gmail.com>
AuthorDate: Fri Jan 5 15:33:49 2024 -0800
stac catalog
---
.../webservice/algorithms/doms/ResultsStorage.py | 15 +-
analysis/webservice/algorithms/doms/StacCatalog.py | 166 +++++++++++++++++++++
analysis/webservice/algorithms/doms/__init__.py | 1 +
.../webservice/webmodel/NexusExecutionResults.py | 6 +
4 files changed, 180 insertions(+), 8 deletions(-)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 39db27b..6b4cc1c 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -286,7 +286,7 @@ class ResultsRetrieval(AbstractResultsContainer):
execution_id = uuid.UUID(execution_id)
params = self.retrieveParams(execution_id)
- stats = self.__retrieveStats(execution_id)
+ stats = self.retrieveStats(execution_id)
data = self.__retrieveData(execution_id, trim_data=trim_data, page_num=page_num, page_size=page_size)
return params, stats, data
@@ -357,19 +357,18 @@ class ResultsRetrieval(AbstractResultsContainer):
return entry
- def __retrieveStats(self, id):
- cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete FROM doms_execution_stats where execution_id = %s limit 1"
+ def retrieveStats(self, id):
+ cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete, num_unique_secondaries FROM doms_execution_stats where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
stats = {
- "timeToComplete": row.time_to_complete,
- "numSecondaryMatched": row.num_insitu_matched,
- "numPrimaryMatched": row.num_gridded_matched,
+ 'timeToComplete': row.time_to_complete,
+ 'numSecondaryMatched': row.num_insitu_matched,
+ 'numPrimaryMatched': row.num_gridded_matched,
+ 'numUniqueSecondaries': row.num_unique_secondaries
}
return stats
- raise NexusProcessingException(reason=f'No stats found for id {str(id)}', code=404)
-
def retrieveParams(self, id):
cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
diff --git a/analysis/webservice/algorithms/doms/StacCatalog.py b/analysis/webservice/algorithms/doms/StacCatalog.py
new file mode 100644
index 0000000..2c1aa12
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/StacCatalog.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import re
+import uuid
+from typing import List
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusProcessingException
+from webservice.webmodel import NexusResults
+
+from . import BaseDomsHandler
+
+
+class StacResults(NexusResults):
+ def __init__(self, contents):
+ NexusResults.__init__(self)
+ self.contents = contents
+
+ def toJson(self):
+ return json.dumps(self.contents, indent=4)
+
+
+@nexus_handler
+class StacCatalog(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = 'STAC Catalog Handler'
+ path = '^/cdmscatalog/?.*$'
+ description = ''
+ params = {}
+ singleton = True
+
+ def __init__(self, tile_service_factory, config=None):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+ self.config = config
+
+ def construct_catalog(self, execution_id: str):
+ return {
+ 'stac_version': '1.0.0',
+ 'type': 'Catalog',
+ 'id': str(execution_id),
+ 'description': 'STAC Catalog for CDMS output',
+ 'links': [
+ {
+ 'rel': 'collection',
+ 'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}',
+ 'title': f'Collection of pages for {execution_id} {output_format} output'
+ }
+ for output_format in ['CSV', 'JSON', 'NETCDF']
+ ]
+ }
+
+ def construct_collection(self, execution_id: str, output_format: str,
+ num_primary_matched: int, page_size: int, start_time: str,
+ end_time: str, bbox: List[float]):
+ links = [
+ {
+ 'rel': 'self',
+ 'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}',
+ 'title': 'The current page',
+ 'type': 'application/json'
+ },
+ {
+ 'rel': 'root',
+ 'href': f'https://{self.host}/cdmscatalog/{execution_id}',
+ 'title': f'Root catalog for {execution_id}',
+ }
+ ]
+
+ url = f'https://{self.host}/cdmsresults?id={execution_id}&output={output_format}'
+ for page_num in range(1, num_primary_matched, page_size):
+ links.append({
+ 'rel': 'data',
+ 'href': f'{url}&pageNum={page_num}&pageSize={page_size}'
+ })
+
+ return {
+ 'stac_version': '1.0.0',
+ 'type': 'Collection',
+ 'license': 'not-provided',
+ 'id': f'{execution_id}.{output_format}',
+ 'description': 'Collection of results for CDMS execution and result format',
+ 'extent': {
+ 'spatial': {
+ 'bbox': bbox
+ },
+ 'temporal': {
+ 'interval': [start_time, end_time]
+ }
+ },
+ 'links': links,
+ }
+
+ def calc(self, request, **args):
+ page_size = request.get_int_arg('pageSize', default=1000)
+ url_path_regex = '^\/cdmscatalog\/?(?P<id>[a-zA-Z0-9-]*)\/?(?P<format>[a-zA-Z0-9]*)'
+ match = re.search(url_path_regex, request.requestHandler.request.path)
+
+ execution_id = match.group('id')
+ output_format = match.group('format')
+
+ self.host = request.requestHandler.request.host
+
+ if not execution_id:
+ raise NexusProcessingException(
+ reason=f'Execution ID path param must be provided.',
+ code=400
+ )
+
+ if execution_id:
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except ValueError:
+ raise NexusProcessingException(
+ reason=f'"{execution_id}" is not a valid uuid',
+ code=400
+ )
+
+ if output_format and output_format.upper() not in ['CSV', 'JSON', 'NETCDF']:
+ raise NexusProcessingException(
+ reason=f'"{output_format}" is not a valid format. Should be CSV, JSON, or NETCDF.',
+ code=400
+ )
+
+ if execution_id and not output_format:
+ # Route to STAC catalog for execution
+ stac_output = self.construct_catalog(execution_id)
+ elif execution_id and output_format:
+ # Route to STAC collection for execution+format
+
+ with ResultsRetrieval(self.config) as retrieval:
+ try:
+ execution_stats = retrieval.retrieveStats(execution_id)
+ execution_params = retrieval.retrieveParams(execution_id)
+ except NexusProcessingException:
+ execution_stats = {}
+
+ num_primary_matched = execution_stats.get('numPrimaryMatched', 0)
+ start_time = execution_params['startTime'].isoformat()
+ end_time = execution_params['endTime'].isoformat()
+ bbox = list(map(float, execution_params['bbox'].split(',')))
+
+ stac_output = self.construct_collection(
+ execution_id, output_format, num_primary_matched, page_size,
+ start_time, end_time, bbox
+ )
+ else:
+ raise NexusProcessingException(
+ reason=f'Invalid path parameters were provided',
+ code=400
+ )
+
+ return StacResults(stac_output)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index bc568f8..7e5715f 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -20,6 +20,7 @@ from . import ExecutionCancel
from . import DatasetListQuery
from . import DomsInitialization
from . import MatchupQuery
+from . import StacCatalog
from . import MetadataQuery
from . import ResultsPlotQuery
from . import ResultsRetrieval
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
index d5c1204..be9d332 100644
--- a/analysis/webservice/webmodel/NexusExecutionResults.py
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -60,6 +60,12 @@ def construct_done(status, created, completed, execution_id, params, host):
('JSON', 'application/json'),
('NETCDF', 'binary/octet-stream')
]
+ job_body['links'].append({
+ 'href': f'{host}/cdmscatalog/{execution_id}',
+ 'title': 'STAC Catalog for execution results',
+ 'type': 'application/json',
+ 'rel': 'stac'
+ })
data_links = [{
'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
'title': 'Download results',