You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2024/01/05 23:34:01 UTC

(incubator-sdap-nexus) 01/01: stac catalog

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-506
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 32ca3d709237d324decada84fe92a4a4044b6521
Author: skorper <st...@gmail.com>
AuthorDate: Fri Jan 5 15:33:49 2024 -0800

    stac catalog
---
 .../webservice/algorithms/doms/ResultsStorage.py   |  15 +-
 analysis/webservice/algorithms/doms/StacCatalog.py | 166 +++++++++++++++++++++
 analysis/webservice/algorithms/doms/__init__.py    |   1 +
 .../webservice/webmodel/NexusExecutionResults.py   |   6 +
 4 files changed, 180 insertions(+), 8 deletions(-)

diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 39db27b..6b4cc1c 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -286,7 +286,7 @@ class ResultsRetrieval(AbstractResultsContainer):
             execution_id = uuid.UUID(execution_id)
 
         params = self.retrieveParams(execution_id)
-        stats = self.__retrieveStats(execution_id)
+        stats = self.retrieveStats(execution_id)
         data = self.__retrieveData(execution_id, trim_data=trim_data, page_num=page_num, page_size=page_size)
         return params, stats, data
 
@@ -357,19 +357,18 @@ class ResultsRetrieval(AbstractResultsContainer):
 
         return entry
 
-    def __retrieveStats(self, id):
-        cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete FROM doms_execution_stats where execution_id = %s limit 1"
+    def retrieveStats(self, id):
+        cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete, num_unique_secondaries FROM doms_execution_stats where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
         for row in rows:
             stats = {
-                "timeToComplete": row.time_to_complete,
-                "numSecondaryMatched": row.num_insitu_matched,
-                "numPrimaryMatched": row.num_gridded_matched,
+                'timeToComplete': row.time_to_complete,
+                'numSecondaryMatched': row.num_insitu_matched,
+                'numPrimaryMatched': row.num_gridded_matched,
+                'numUniqueSecondaries': row.num_unique_secondaries
             }
             return stats
 
-        raise NexusProcessingException(reason=f'No stats found for id {str(id)}', code=404)
-
     def retrieveParams(self, id):
         cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
diff --git a/analysis/webservice/algorithms/doms/StacCatalog.py b/analysis/webservice/algorithms/doms/StacCatalog.py
new file mode 100644
index 0000000..2c1aa12
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/StacCatalog.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import re
+import uuid
+from typing import List
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusProcessingException
+from webservice.webmodel import NexusResults
+
+from . import BaseDomsHandler
+
+
+class StacResults(NexusResults):
+    def __init__(self, contents):
+        NexusResults.__init__(self)
+        self.contents = contents
+
+    def toJson(self):
+        return json.dumps(self.contents, indent=4)
+
+
+@nexus_handler
+class StacCatalog(BaseDomsHandler.BaseDomsQueryCalcHandler):
+    name = 'STAC Catalog Handler'
+    path = '^/cdmscatalog/?.*$'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, tile_service_factory, config=None):
+        BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+        self.config = config
+
+    def construct_catalog(self, execution_id: str):
+        return {
+            'stac_version': '1.0.0',
+            'type': 'Catalog',
+            'id': str(execution_id),
+            'description': 'STAC Catalog for CDMS output',
+            'links': [
+                {
+                    'rel': 'collection',
+                    'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}',
+                    'title': f'Collection of pages for {execution_id} {output_format} output'
+                }
+                for output_format in ['CSV', 'JSON', 'NETCDF']
+            ]
+        }
+
+    def construct_collection(self, execution_id: str, output_format: str,
+                             num_primary_matched: int, page_size: int, start_time: str,
+                             end_time: str, bbox: List[float]):
+        links = [
+            {
+                'rel': 'self',
+                'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}',
+                'title': 'The current page',
+                'type': 'application/json'
+            },
+            {
+                'rel': 'root',
+                'href': f'https://{self.host}/cdmscatalog/{execution_id}',
+                'title': f'Root catalog for {execution_id}',
+            }
+        ]
+
+        url = f'https://{self.host}/cdmsresults?id={execution_id}&output={output_format}'
+        for page_num in range(1, num_primary_matched, page_size):
+            links.append({
+                'rel': 'data',
+                'href': f'{url}&pageNum={page_num}&pageSize={page_size}'
+            })
+
+        return {
+            'stac_version': '1.0.0',
+            'type': 'Collection',
+            'license': 'not-provided',
+            'id': f'{execution_id}.{output_format}',
+            'description': 'Collection of results for CDMS execution and result format',
+            'extent': {
+                'spatial': {
+                    'bbox': bbox
+                },
+                'temporal': {
+                    'interval': [start_time, end_time]
+                }
+            },
+            'links': links,
+        }
+
+    def calc(self, request, **args):
+        page_size = request.get_int_arg('pageSize', default=1000)
+        url_path_regex = '^\/cdmscatalog\/?(?P<id>[a-zA-Z0-9-]*)\/?(?P<format>[a-zA-Z0-9]*)'
+        match = re.search(url_path_regex, request.requestHandler.request.path)
+
+        execution_id = match.group('id')
+        output_format = match.group('format')
+
+        self.host = request.requestHandler.request.host
+
+        if not execution_id:
+            raise NexusProcessingException(
+                reason=f'Execution ID path param must be provided.',
+                code=400
+            )
+
+        if execution_id:
+            try:
+                execution_id = uuid.UUID(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'"{execution_id}" is not a valid uuid',
+                    code=400
+                )
+
+        if output_format and output_format.upper() not in ['CSV', 'JSON', 'NETCDF']:
+            raise NexusProcessingException(
+                reason=f'"{output_format}" is not a valid format. Should be CSV, JSON, or NETCDF.',
+                code=400
+            )
+
+        if execution_id and not output_format:
+            # Route to STAC catalog for execution
+            stac_output = self.construct_catalog(execution_id)
+        elif execution_id and output_format:
+            # Route to STAC collection for execution+format
+
+            with ResultsRetrieval(self.config) as retrieval:
+                try:
+                    execution_stats = retrieval.retrieveStats(execution_id)
+                    execution_params = retrieval.retrieveParams(execution_id)
+                except NexusProcessingException:
+                    execution_stats = {}
+
+            num_primary_matched = execution_stats.get('numPrimaryMatched', 0)
+            start_time = execution_params['startTime'].isoformat()
+            end_time = execution_params['endTime'].isoformat()
+            bbox = list(map(float, execution_params['bbox'].split(',')))
+
+            stac_output = self.construct_collection(
+                execution_id, output_format, num_primary_matched, page_size,
+                start_time, end_time, bbox
+            )
+        else:
+            raise NexusProcessingException(
+                reason=f'Invalid path parameters were provided',
+                code=400
+            )
+
+        return StacResults(stac_output)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index bc568f8..7e5715f 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -20,6 +20,7 @@ from . import ExecutionCancel
 from . import DatasetListQuery
 from . import DomsInitialization
 from . import MatchupQuery
+from . import StacCatalog
 from . import MetadataQuery
 from . import ResultsPlotQuery
 from . import ResultsRetrieval
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
index d5c1204..be9d332 100644
--- a/analysis/webservice/webmodel/NexusExecutionResults.py
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -60,6 +60,12 @@ def construct_done(status, created, completed, execution_id, params, host):
         ('JSON', 'application/json'),
         ('NETCDF', 'binary/octet-stream')
     ]
+    job_body['links'].append({
+        'href': f'{host}/cdmscatalog/{execution_id}',
+        'title': 'STAC Catalog for execution results',
+        'type': 'application/json',
+        'rel': 'stac'
+    })
     data_links = [{
         'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
         'title': 'Download results',