You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/08/21 17:49:34 UTC
[incubator-sdap-nexus] branch master updated: SDAP-455: Large Job Tracking (#249)
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 3071f18 SDAP-455: Large Job Tracking (#249)
3071f18 is described below
commit 3071f18acc9afd82b8d82cc8de28876e726ea963
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Mon Aug 21 10:49:28 2023 -0700
SDAP-455: Large Job Tracking (#249)
* Large job tracking
* updateExecution function updated so execution status is not changed to success until after all results are inserted
* cast exception to string when submitted job failure to db
* Fix bug where NetCDF is not generated for sat to sat matches:
* fix typo
* pin numpy version due to python 3.9+ req
* remove invalid comment
* Only update execution status if job is in running state
* reorder cancel operation. First update DB, then cancel spark job group
* SDAP-467: Pagination (#261)
* pagination
* update openapi spec
* Fix for cdmslist bug in pagination PR (#263)
* Fix for cdmslist bug
* Fix for custom results type for cdmslist query
---------
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
* SDAP-473: Job priorization (#262)
* Update changelog
* job prioritization
* fix typo
* Add check for null __details
* remove unnecessary print statement
---------
Co-authored-by: Riley Kuttruff <72...@users.noreply.github.com>
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
* added type to execution details results links
* added status field to doms table initializer
* fix prioritize distance
* fixed bug where no primary tile matches resulted in error
---------
Co-authored-by: Riley Kuttruff <72...@users.noreply.github.com>
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
CHANGELOG.md | 5 +-
analysis/setup.py | 3 +-
.../webservice/algorithms/doms/BaseDomsHandler.py | 19 +-
.../webservice/algorithms/doms/DatasetListQuery.py | 14 +-
.../algorithms/doms/DomsInitialization.py | 3 +-
.../webservice/algorithms/doms/ExecutionCancel.py | 83 +++++++++
.../webservice/algorithms/doms/ExecutionStatus.py | 67 +++++++
.../webservice/algorithms/doms/ResultsRetrieval.py | 6 +-
.../webservice/algorithms/doms/ResultsStorage.py | 78 ++++++---
analysis/webservice/algorithms/doms/__init__.py | 2 +
analysis/webservice/algorithms_spark/Matchup.py | 193 ++++++++++++++-------
.../NexusCalcSparkTornadoHandler.py | 15 +-
analysis/webservice/apidocs/openapi.yml | 52 ++++++
analysis/webservice/config/scheduler.xml | 10 ++
.../app_builders/HandlerArgsBuilder.py | 1 +
.../app_builders/SparkContextBuilder.py | 9 +-
.../request/handlers/NexusRequestHandler.py | 22 ++-
.../webservice/webmodel/NexusExecutionResults.py | 150 ++++++++++++++++
data-access/requirements.txt | 2 +
19 files changed, 630 insertions(+), 104 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 147dac9..14b2576 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,9 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## Unreleased
+## [Unreleased]
### Added
+- SDAP-467: Added pagination to cdmsresults endpoint
- SDAP-461: Added 4 remaining Saildrone insitu datasets.
+- SDAP-473: Added support for matchup job prioritization
### Changed
- SDAP-453: Updated results storage and retrieval to support output JSON from `/cdmsresults` that matches output from `/match_spark`.
- **NOTE:** Deploying these changes to an existing SDAP deployment will require modifying the Cassandra database with stored results. There is a script to do so at `/tools/update-doms-data-schema/update.py`
@@ -14,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Made the `output` parameter case-insensitive
- Improved speed of results insert
- Updated `id` field of insitu points to include depth. This solves an issue with NetCDF result rendering where different insitu observations at the same place & time but at different depths were being excluded for having the same `id`.
+- SDAP-455: Large job tracking
- SDAP-466: Matchup now defines secondary `platform` fields with `platform.id` if it is available and not blank. It then uses `platform.code` and `platform.type` as fallbacks, then just the value of `platform` if none work.
- SDAP-468: Updated matchup output filename
- SDAP-482: Updated Saildrone in situ endpoint in config file
diff --git a/analysis/setup.py b/analysis/setup.py
index 99cd707..8fbc617 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -60,7 +60,8 @@ setuptools.setup(
'config/algorithms.ini',
'apidocs/index.html',
'apidocs/openapi.yml',
- 'apidocs/dataset-populate.js'
+ 'apidocs/dataset-populate.js',
+ 'config/scheduler.xml'
],
'webservice.algorithms.doms': ['domsconfig.ini.default'],
},
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index af2e6d3..84c9163 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -85,7 +85,7 @@ class DomsEncoder(json.JSONEncoder):
class DomsQueryResults(NexusResults):
def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None,
- executionId=None, status_code=200):
+ executionId=None, status_code=200, page_num=None, page_size=None):
NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions,
status_code=status_code)
self.__args = args
@@ -94,6 +94,13 @@ class DomsQueryResults(NexusResults):
self.__details = details
self.__executionId = str(executionId)
+ if self.__details is None:
+ self.__details = {}
+
+ # Add page num and size to details block
+ self.__details['pageNum'] = page_num
+ self.__details['pageSize'] = page_size
+
def toJson(self):
bounds = self.__bounds.toMap() if self.__bounds is not None else {}
return json.dumps(
@@ -279,6 +286,9 @@ class DomsCSVFormatter:
{"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
{"Global Attribute": "URI_Matchup", "Value": "https://doms.jpl.nasa.gov/domsresults?id=" + executionId + "&output=CSV"}, # TODO how to replace with actual req URL
+
+ {"Global Attribute": "CDMS_page_num", "Value": details["pageNum"]},
+ {"Global Attribute": "CDMS_page_size", "Value": details["pageSize"]},
]
writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
@@ -329,6 +339,8 @@ class DomsNetCDFFormatter:
dataset.CDMS_primary = params["primary"]
dataset.CDMS_time_to_complete = details["timeToComplete"]
dataset.CDMS_time_to_complete_units = "seconds"
+ dataset.CDMS_page_num = details["pageNum"]
+ dataset.CDMS_page_size = details["pageSize"]
insituDatasets = params["matchup"]
insituLinks = set()
@@ -408,7 +420,10 @@ class DomsNetCDFFormatter:
# Add each match only if it is not already in the array of in situ points
for match in result["matches"]:
- key = (match['id'], f'{match["depth"]:.4}')
+ depth_str = ''
+ if match['depth'] is not None:
+ depth_str = f'{match["depth"]:.4}'
+ key = (match['id'], depth_str)
if key not in ids:
ids[key] = insituIndex
diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py
index 78ba6ab..7c40d18 100644
--- a/analysis/webservice/algorithms/doms/DatasetListQuery.py
+++ b/analysis/webservice/algorithms/doms/DatasetListQuery.py
@@ -23,7 +23,7 @@ from . import config
from . import values
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import cached
+from webservice.webmodel import cached, NexusResults
@nexus_handler
@@ -120,4 +120,14 @@ class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
"insitu": insituList
}
- return BaseDomsHandler.DomsQueryResults(results=values)
+ return DatasetListResults(results=values)
+
+
+class DatasetListResults(NexusResults):
+ def __init__(self, results=None):
+ NexusResults.__init__(self, results=results)
+
+ self.__results = results
+
+ def toJson(self):
+ return json.dumps({'data': self.__results}, indent=4)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index b695c9e..ed7db20 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -120,7 +120,8 @@ class DomsInitializer:
id uuid PRIMARY KEY,
time_started timestamp,
time_completed timestamp,
- user_email text
+ user_email text,
+ status text
);
"""
session.execute(cql)
diff --git a/analysis/webservice/algorithms/doms/ExecutionCancel.py b/analysis/webservice/algorithms/doms/ExecutionCancel.py
new file mode 100644
index 0000000..466428c
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionCancel.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
+from datetime import datetime
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(NexusCalcSparkTornadoHandler):
+ name = 'Execution Status Handler'
+ path = '/job/cancel'
+ description = ''
+ params = {}
+ singleton = True
+
+ def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
+ NexusCalcSparkTornadoHandler.__init__(
+ self,
+ algorithm_config=algorithm_config,
+ sc=sc,
+ tile_service_factory=tile_service_factory
+ )
+ self.tile_service_factory = tile_service_factory
+ self.config = config
+
+ def calc(self, request, tornado_io_loop, **args):
+ execution_id = request.get_argument('id', None)
+
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except ValueError:
+ raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+ with ResultsRetrieval(self.config) as retrieval:
+ try:
+ execution_details = retrieval.retrieveExecution(execution_id)
+ except ValueError:
+ raise NexusProcessingException(
+ reason=f'Execution {execution_id} not found',
+ code=404
+ )
+
+ job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+
+ # Only proceed if status is "running". Otherwise, noop
+ if job_status == ExecutionStatus.RUNNING:
+ # Update job status to "cancelled"
+ end = datetime.utcnow()
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ execution_id,
+ completeTime=end,
+ status=ExecutionStatus.CANCELLED.value,
+ message=None,
+ stats=None,
+ results=None
+ )
+
+ # Cancel Spark job
+ self._sc.cancelJobGroup(str(execution_id))
+
+ # Redirect to job status endpoint
+ request.requestHandler.redirect(f'/job?id={execution_id}')
diff --git a/analysis/webservice/algorithms/doms/ExecutionStatus.py b/analysis/webservice/algorithms/doms/ExecutionStatus.py
new file mode 100644
index 0000000..1bae455
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionStatus.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from . import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = 'Execution Status Handler'
+ path = '/job'
+ description = ''
+ params = {}
+ singleton = True
+
+ def __init__(self, tile_service_factory, config=None):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+ self.config = config
+
+ def calc(self, request, **args):
+ execution_id = request.get_argument('id', None)
+
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except ValueError:
+ raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+ # Check if the job is done
+ with ResultsRetrieval(self.config) as retrieval:
+ try:
+ execution_details = retrieval.retrieveExecution(execution_id)
+ execution_params = retrieval.retrieveParams(execution_id)
+ except ValueError:
+ raise NexusProcessingException(
+ reason=f'Execution {execution_id} not found',
+ code=404
+ )
+
+ job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+ host = f'{request.requestHandler.request.protocol}://{request.requestHandler.request.host}'
+
+ return NexusExecutionResults.NexusExecutionResults(
+ status=job_status,
+ created=execution_details['timeStarted'],
+ completed=execution_details['timeCompleted'],
+ execution_id=execution_id,
+ message=execution_details['message'],
+ params=execution_params,
+ host=host
+ )
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
index c3b95b0..f03c1ca 100644
--- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py
+++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
@@ -35,6 +35,8 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
def calc(self, computeOptions, **args):
execution_id = computeOptions.get_argument("id", None)
+ page_num = computeOptions.get_int_arg('pageNum', default=1)
+ page_size = computeOptions.get_int_arg('pageSize', default=1000)
try:
execution_id = uuid.UUID(execution_id)
@@ -44,7 +46,7 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
simple_results = computeOptions.get_boolean_arg("simpleResults", default=False)
with ResultsStorage.ResultsRetrieval(self.config) as storage:
- params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
+ params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results, page_num=page_num, page_size=page_size)
return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=len(data),
- computeOptions=None, executionId=execution_id)
+ computeOptions=None, executionId=execution_id, page_num=page_num, page_size=page_size)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index d0ead7e..6b210c0 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -26,7 +26,7 @@ import pkg_resources
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
-from cassandra.query import BatchStatement
+from cassandra.query import BatchStatement, SimpleStatement
from pytz import UTC
from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
from webservice.webmodel import NexusProcessingException
@@ -106,24 +106,40 @@ class ResultsStorage(AbstractResultsContainer):
def __init__(self, config=None):
AbstractResultsContainer.__init__(self, config)
- def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
- self._log.info('Beginning results write')
+ def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+ """
+ Initial insert into database for CDMS matchup request. This
+ populates the execution and params table.
+ """
if isinstance(execution_id, str):
execution_id = uuid.UUID(execution_id)
- execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+ execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
self.__insertParams(execution_id, params)
- self.__insertStats(execution_id, stats)
- self.__insertResults(execution_id, results)
- self._log.info('Results write finished')
return execution_id
- def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+ def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+ if stats:
+ self.__insertStats(execution_id, stats)
+ if results:
+ self.__insertResults(execution_id, results)
+ self.__updateExecution(execution_id, completeTime, status, message)
+
+ def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+ """
+ Insert new entry into execution table
+ """
if execution_id is None:
execution_id = uuid.uuid4()
- cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
- self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+ cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+ self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+ return execution_id
+
+ def __updateExecution(self, execution_id, complete_time, status, message=None):
+ # Only update the status if it's "running". Any other state is immutable.
+ cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s IF status = 'running'"
+ self._session.execute(cql, (complete_time, status, message, execution_id))
return execution_id
def __insertParams(self, execution_id, params):
@@ -259,17 +275,17 @@ class ResultsRetrieval(AbstractResultsContainer):
def __init__(self, config=None):
AbstractResultsContainer.__init__(self, config)
- def retrieveResults(self, execution_id, trim_data=False):
+ def retrieveResults(self, execution_id, trim_data=False, page_num=1, page_size=1000):
if isinstance(execution_id, str):
execution_id = uuid.UUID(execution_id)
- params = self.__retrieveParams(execution_id)
+ params = self.retrieveParams(execution_id)
stats = self.__retrieveStats(execution_id)
- data = self.__retrieveData(execution_id, trim_data=trim_data)
+ data = self.__retrieveData(execution_id, trim_data=trim_data, page_num=page_num, page_size=page_size)
return params, stats, data
- def __retrieveData(self, id, trim_data=False):
- dataMap = self.__retrievePrimaryData(id, trim_data=trim_data)
+ def __retrieveData(self, id, trim_data=False, page_num=1, page_size=1000):
+ dataMap = self.__retrievePrimaryData(id, trim_data=trim_data, page_num=page_num, page_size=page_size)
self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data)
data = [dataMap[name] for name in dataMap]
return data
@@ -284,15 +300,13 @@ class ResultsRetrieval(AbstractResultsContainer):
if not "matches" in dataMap[row.primary_value_id]:
dataMap[row.primary_value_id]["matches"] = []
dataMap[row.primary_value_id]["matches"].append(entry)
- else:
- print(row)
- def __retrievePrimaryData(self, id, trim_data=False):
- cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true"
- rows = self._session.execute(cql, (id,))
+ def __retrievePrimaryData(self, id, trim_data=False, page_num=2, page_size=10):
+ cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true limit %s"
+ rows = self._session.execute(cql, [id, page_num * page_size])
dataMap = {}
- for row in rows:
+ for row in rows[(page_num-1)*page_size:page_num*page_size]:
entry = self.__rowToDataEntry(row, trim_data=trim_data)
dataMap[row.value_id] = entry
return dataMap
@@ -343,7 +357,7 @@ class ResultsRetrieval(AbstractResultsContainer):
raise Exception("Execution not found with id '%s'" % id)
- def __retrieveParams(self, id):
+ def retrieveParams(self, id):
cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
@@ -368,3 +382,23 @@ class ResultsRetrieval(AbstractResultsContainer):
return params
raise Exception("Execution not found with id '%s'" % id)
+
+ def retrieveExecution(self, execution_id):
+ """
+ Retrieve execution details from database.
+
+ :param execution_id: Execution ID
+ :return: execution status dictionary
+ """
+
+ cql = "SELECT * FROM doms_executions where id = %s limit 1"
+ rows = self._session.execute(cql, (execution_id,))
+ for row in rows:
+ return {
+ 'status': row.status,
+ 'message': row.message,
+ 'timeCompleted': row.time_completed,
+ 'timeStarted': row.time_started
+ }
+
+ raise ValueError('Execution not found with id %s', execution_id)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index 9580d38..bc568f8 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -15,6 +15,8 @@
from . import BaseDomsHandler
+from . import ExecutionStatus
+from . import ExecutionCancel
from . import DatasetListQuery
from . import DomsInitialization
from . import MatchupQuery
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index f27612a..a55f61d 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
+import uuid
from typing import Optional
import logging
import threading
@@ -30,20 +29,23 @@ from pytz import timezone, UTC
from scipy import spatial
from shapely import wkt
from shapely.geometry import box
+import functools
from webservice.NexusHandler import nexus_handler
-from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
from webservice.algorithms.doms import config as edge_endpoints
from webservice.algorithms.doms import values as doms_values
-from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
from webservice.algorithms.doms.ResultsStorage import ResultsStorage
from webservice.algorithms.doms.insitu import query_insitu as query_edge
from webservice.algorithms.doms.insitu import query_insitu_schema
from webservice.webmodel import NexusProcessingException
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+LARGE_JOB_THRESHOLD = 4000
+
class Schema:
def __init__(self):
@@ -66,7 +68,7 @@ def iso_time_to_epoch(str_time):
@nexus_handler
-class Matchup(NexusCalcSparkHandler):
+class Matchup(NexusCalcSparkTornadoHandler):
name = "Matchup"
path = "/match_spark"
description = "Match measurements between two or more datasets"
@@ -154,7 +156,12 @@ class Matchup(NexusCalcSparkHandler):
singleton = True
def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
- NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
+ NexusCalcSparkTornadoHandler.__init__(
+ self,
+ algorithm_config=algorithm_config,
+ sc=sc,
+ tile_service_factory=tile_service_factory
+ )
self.log = logging.getLogger(__name__)
self.tile_service_factory = tile_service_factory
self.config = config
@@ -229,42 +236,82 @@ class Matchup(NexusCalcSparkHandler):
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit, prioritize_distance
- def calc(self, request, **args):
- start = datetime.utcnow()
- # TODO Assuming Satellite primary
- bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
- start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
- depth_min, depth_max, time_tolerance, radius_tolerance, \
- platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
-
- with ResultsStorage(self.config) as resultsStorage:
+ def get_job_pool(self, tile_ids):
+ if len(tile_ids) > LARGE_JOB_THRESHOLD:
+ return 'large'
+ return 'small'
- execution_id = str(resultsStorage.insertExecution(None, start, None, None))
-
- self.log.debug("Querying for tiles in search domain")
- # Get tile ids in box
- tile_ids = [tile.tile_id for tile in
- self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
- start_seconds_from_epoch, end_seconds_from_epoch,
- fetch_data=False, fl='id',
- sort=['tile_min_time_dt asc', 'tile_min_lon asc',
- 'tile_min_lat asc'], rows=5000)]
-
- self.log.info('Found %s tile_ids', len(tile_ids))
+ def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
+ secondary_ds_names, parameter_s, start_time, end_time, depth_min,
+ depth_max, time_tolerance, radius_tolerance, platforms, match_once,
+ result_size_limit, start, prioritize_distance):
# Call spark_matchup
self.log.debug("Calling Spark Driver")
+
+ job_priority = self.get_job_pool(tile_ids)
+
try:
- spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
- secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
- radius_tolerance, platforms, match_once, self.tile_service_factory,
- prioritize_distance, sc=self._sc)
- except Exception as e:
- self.log.exception(e)
- raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
+ self._sc.setJobGroup(execution_id, execution_id)
+ self._sc.setLocalProperty('spark.scheduler.pool', job_priority)
+ spark_result = spark_matchup_driver(
+ tile_ids, wkt.dumps(bounding_polygon),
+ primary_ds_name,
+ secondary_ds_names,
+ parameter_s,
+ depth_min,
+ depth_max, time_tolerance,
+ radius_tolerance,
+ platforms,
+ match_once,
+ self.tile_service_factory,
+ sc=self._sc,
+ prioritize_distance=prioritize_distance
+ )
+ except Exception as error:
+ self.log.exception(error)
+ end = datetime.utcnow()
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ uuid.UUID(execution_id),
+ completeTime=end,
+ status=ExecutionStatus.FAILED.value,
+ message=str(error),
+ stats=None,
+ results=None
+ )
+ return
+ self.log.debug("Building and saving results")
end = datetime.utcnow()
- self.log.debug("Building and saving results")
+ total_keys = len(list(spark_result.keys()))
+ total_values = sum(len(v) for v in spark_result.values())
+ details = {
+ "timeToComplete": int((end - start).total_seconds()),
+ "numSecondaryMatched": total_values,
+ "numPrimaryMatched": total_keys
+ }
+
+ matches = Matchup.convert_to_matches(spark_result)
+
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ uuid.UUID(execution_id),
+ completeTime=end,
+ status=ExecutionStatus.SUCCESS.value,
+ message=None,
+ stats=details,
+ results=matches
+ )
+
+ def calc(self, request, tornado_io_loop, **args):
+ start = datetime.utcnow()
+ # TODO Assuming Satellite primary
+ bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
+ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+ depth_min, depth_max, time_tolerance, radius_tolerance, \
+ platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
+
args = {
"primary": primary_ds_name,
"matchup": secondary_ds_names,
@@ -283,37 +330,63 @@ class Matchup(NexusCalcSparkHandler):
if depth_max is not None:
args["depthMax"] = float(depth_max)
- total_keys = len(list(spark_result.keys()))
- total_values = sum(len(v) for v in spark_result.values())
- details = {
- "timeToComplete": int((end - start).total_seconds()),
- "numSecondaryMatched": total_values,
- "numPrimaryMatched": total_keys
- }
- matches = Matchup.convert_to_matches(spark_result)
+ with ResultsStorage(self.config) as resultsStorage:
+ execution_id = str(resultsStorage.insertInitialExecution(
+ params=args,
+ startTime=start,
+ status=ExecutionStatus.RUNNING.value
+ ))
- def do_result_insert():
- with ResultsStorage(self.config) as storage:
- storage.insertResults(results=matches, params=args, stats=details,
- startTime=start, completeTime=end, userEmail="",
- execution_id=execution_id)
+ self.log.debug("Querying for tiles in search domain")
+ # Get tile ids in box
+ tile_ids = [tile.tile_id for tile in
+ self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+ start_seconds_from_epoch, end_seconds_from_epoch,
+ fetch_data=False, fl='id',
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+ 'tile_min_lat asc'], rows=5000)]
- threading.Thread(target=do_result_insert).start()
+ self.log.info('Found %s tile_ids', len(tile_ids))
- # Get only the first "result_size_limit" results
- # '0' means returns everything
- if result_size_limit > 0:
- return_matches = matches[0:result_size_limit]
- else:
- return_matches = matches
+ if not tile_ids:
+ # There are no matching tiles
+ end = datetime.utcnow()
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ uuid.UUID(execution_id),
+ completeTime=end,
+ status=ExecutionStatus.FAILED.value,
+ message='No tiles matched the provided domain',
+ stats=None,
+ results=None
+ )
- result = DomsQueryResults(results=return_matches, args=args,
- details=details, bounds=None,
- count=len(matches), computeOptions=None,
- executionId=execution_id)
+ # Start async processing with Spark. Do not wait for response
+ # before returning to user.
+ tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
+ self.async_calc,
+ execution_id=execution_id,
+ tile_ids=tile_ids,
+ bounding_polygon=bounding_polygon,
+ primary_ds_name=primary_ds_name,
+ secondary_ds_names=secondary_ds_names,
+ parameter_s=parameter_s,
+ start_time=start_time,
+ end_time=end_time,
+ depth_min=depth_min,
+ depth_max=depth_max,
+ time_tolerance=time_tolerance,
+ radius_tolerance=radius_tolerance,
+ platforms=platforms,
+ match_once=match_once,
+ result_size_limit=result_size_limit,
+ start=start,
+ prioritize_distance=prioritize_distance
+ ))
+
+ request.requestHandler.redirect(f'/job?id={execution_id}')
- return result
@classmethod
def convert_to_matches(cls, spark_result):
diff --git a/data-access/requirements.txt b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
similarity index 79%
copy from data-access/requirements.txt
copy to analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
index 5127018..c2a38c5 100644
--- a/data-access/requirements.txt
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cassandra-driver==3.24.0
-pysolr==3.9.0
-elasticsearch==8.3.1
-urllib3==1.26.2
-requests
-nexusproto
-Shapely
+import logging
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+
+logger = logging.getLogger(__name__)
+
+
+class NexusCalcSparkTornadoHandler(NexusCalcSparkHandler):
+ pass
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index e691f86..ea9b16b 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -535,6 +535,22 @@ paths:
type: string
enum: ['CSV', 'NETCDF', 'JSON']
example: CSV
+ - in: query
+ name: pageNum
+ description: |
+ Page of results to retrieve.
+ required: false
+ schema:
+ type: integer
+ example: 1
+ - in: query
+ name: pageSize
+ description: |
+ Number of items to return in each page.
+ required: false
+ schema:
+ type: integer
+ example: 100
- in: query
name: filename
description: |
@@ -669,6 +685,42 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/Error'
+ /job:
+ get:
+ summary: |
+ Get job status
+ operationId: job
+ tags:
+ - Analytics
+ description: "Get job status by execution id"
+ parameters:
+ - in: query
+ name: id
+ description: |
+ The job execution ID
+ required: true
+ schema:
+ type: string
+ format: uuid
+ example: c864a51b-3d87-4872-9070-632820b1cae2
+ /job/cancel:
+ get:
+ summary: |
+ Cancel running job
+ operationId: jobCancel
+ tags:
+ - Analytics
+ description: "Cancel running job"
+ parameters:
+ - in: query
+ name: id
+ description: |
+ The job execution ID
+ required: true
+ schema:
+ type: string
+ format: uuid
+ example: c864a51b-3d87-4872-9070-632820b1cae2
externalDocs:
description: Documentation
url: https://incubator-sdap-nexus.readthedocs.io/en/latest/index.html
diff --git a/analysis/webservice/config/scheduler.xml b/analysis/webservice/config/scheduler.xml
new file mode 100644
index 0000000..3016017
--- /dev/null
+++ b/analysis/webservice/config/scheduler.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0"?>
+<allocations>
+ <pool name="small">
+ <weight>1000</weight>
+ <minShare>1</minShare>
+ </pool>
+ <pool name="large">
+ <weight>1</weight>
+ </pool>
+</allocations>
\ No newline at end of file
diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
index fed08b9..2a84ae7 100644
--- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
@@ -37,6 +37,7 @@ class HandlerArgsBuilder:
class_wrapper == webservice.algorithms_spark.Matchup.Matchup
or class_wrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms
or issubclass(class_wrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler)
+ or issubclass(class_wrapper, webservice.algorithms_spark.NexusCalcSparkTornadoHandler.NexusCalcSparkTornadoHandler)
or class_wrapper == webservice.algorithms.doms.ResultsRetrieval.DomsResultsRetrievalHandler
or class_wrapper == webservice.algorithms.doms.ResultsPlotQuery.DomsResultsPlotHandler
)
diff --git a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
index ee3fd2f..5daf279 100644
--- a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import pkg_resources
class SparkContextBuilder:
@@ -25,7 +26,13 @@ class SparkContextBuilder:
if cls.spark_context is None:
from pyspark.sql import SparkSession
- spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
+ scheduler_path = pkg_resources.resource_filename('webservice', "config/scheduler.xml")
+
+ spark = SparkSession.builder.appName("nexus-analysis").config(
+ "spark.scheduler.allocation.file", scheduler_path
+ ).config(
+ "spark.scheduler.mode", "FAIR"
+ ).getOrCreate()
cls.spark_context = spark.sparkContext
return cls.spark_context
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 2645574..95bddf4 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -20,6 +20,7 @@ import tornado.ioloop
from webservice.nexus_tornado.request.renderers import NexusRendererFactory
from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
class NexusRequestHandler(tornado.web.RequestHandler):
@@ -44,18 +45,29 @@ class NexusRequestHandler(tornado.web.RequestHandler):
# create NexusCalcHandler which will process the request
instance = self.__clazz(**self._clazz_init_args)
+ io_loop = tornado.ioloop.IOLoop.current()
+
try:
- # process the request asynchronously on a different thread,
- # the current tornado handler is still available to get other user requests
- results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+ if isinstance(instance, NexusCalcSparkTornadoHandler):
+ results = instance.calc(request, io_loop)
+ else:
+ results = yield io_loop.run_in_executor(
+ self.executor,
+ instance.calc,
+ request
+ )
try:
self.set_status(results.status_code)
except AttributeError:
pass
- renderer = NexusRendererFactory.get_renderer(request)
- renderer.render(self, results)
+ # Only render results if there are results to render.
+ # "NexusCalcSparkTornadoHandler" endpoints redirectm so no
+ # need to render.
+ if not isinstance(instance, NexusCalcSparkTornadoHandler):
+ renderer = NexusRendererFactory.get_renderer(request)
+ renderer.render(self, results)
except NexusProcessingException as e:
self.async_onerror_callback(e.reason, e.code)
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
new file mode 100644
index 0000000..d5c1204
--- /dev/null
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+ RUNNING = 'running'
+ SUCCESS = 'success'
+ FAILED = 'failed'
+ CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+ return {
+ 'status': job_state.value,
+ 'message': message,
+ 'createdAt': created,
+ 'updatedAt': updated,
+ 'links': [{
+ 'href': f'{host}/job?id={execution_id}',
+ 'title': 'The current page',
+ 'type': 'application/json',
+ 'rel': 'self'
+ }],
+ 'params': params,
+ 'jobID': execution_id
+ }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+ job_body = construct_job_status(
+ status,
+ created,
+ completed,
+ execution_id,
+ params,
+ host
+ )
+
+ # Construct urls
+ formats = [
+ ('CSV', 'text/csv'),
+ ('JSON', 'application/json'),
+ ('NETCDF', 'binary/octet-stream')
+ ]
+ data_links = [{
+ 'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+ 'title': 'Download results',
+ 'type': mime,
+ 'rel': 'data'
+ } for output_format, mime in formats]
+ job_body['links'].extend(data_links)
+ return job_body
+
+
+def construct_running(status, created, execution_id, params, host):
+ job_body = construct_job_status(
+ status,
+ created,
+ None,
+ execution_id,
+ params,
+ host
+ )
+ job_body['links'].append({
+ 'href': f'{host}/job/cancel?id={execution_id}',
+ 'title': 'Cancel the job',
+ 'rel': 'cancel'
+ })
+ return job_body
+
+
+def construct_error(status, created, completed, execution_id, message, params, host):
+ return construct_job_status(
+ status,
+ created,
+ completed,
+ execution_id,
+ params,
+ host,
+ message
+ )
+
+
+def construct_cancelled(status, created, completed, execution_id, params, host):
+ return construct_job_status(
+ status,
+ created,
+ completed,
+ execution_id,
+ params,
+ host
+ )
+
+
+class NexusExecutionResults:
+ def __init__(self, status=None, created=None, completed=None, execution_id=None, message='',
+ params=None, host=None, status_code=200):
+ self.status_code = status_code
+ self.status = status
+ self.created = created
+ self.completed = completed
+ self.execution_id = execution_id
+ self.message = message
+ self.execution_params = params
+ self.host = host
+
+ def toJson(self):
+ params = {
+ 'status': self.status,
+ 'created': self.created,
+ 'execution_id': self.execution_id,
+ 'params': self.execution_params,
+ 'host': self.host
+ }
+ if self.status == ExecutionStatus.SUCCESS:
+ params['completed'] = self.completed
+ construct = construct_done
+ elif self.status == ExecutionStatus.RUNNING:
+ construct = construct_running
+ elif self.status == ExecutionStatus.FAILED:
+ params['completed'] = self.completed
+ params['message'] = self.message
+ construct = construct_error
+ elif self.status == ExecutionStatus.CANCELLED:
+ params['completed'] = self.completed
+ construct = construct_cancelled
+ else:
+ # Raise error -- job state is invalid
+ raise ValueError('Unable to fetch status for execution {}', self.execution_id)
+
+ job_details = construct(**params)
+ return json.dumps(job_details, indent=4, default=str)
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index 5127018..f91f180 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+numpy==1.24.3
cassandra-driver==3.24.0
pysolr==3.9.0
elasticsearch==8.3.1
@@ -20,3 +21,4 @@ urllib3==1.26.2
requests
nexusproto
Shapely
+numpy==1.24.3