You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/05/10 00:47:08 UTC
[incubator-sdap-nexus] 01/01: Large job tracking
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch SDAP-455
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 8725ff6fc35c24436323b89edc5cf9b91901e886
Author: skorper <st...@gmail.com>
AuthorDate: Tue May 9 17:46:54 2023 -0700
Large job tracking
---
CHANGELOG.md | 1 +
.../webservice/algorithms/doms/ExecutionCancel.py | 83 ++++++++++
.../webservice/algorithms/doms/ExecutionStatus.py | 69 ++++++++
.../webservice/algorithms/doms/ResultsStorage.py | 61 +++++--
analysis/webservice/algorithms/doms/__init__.py | 2 +
analysis/webservice/algorithms_spark/Matchup.py | 176 ++++++++++++++-------
.../NexusCalcSparkTornadoHandler.py} | 25 +--
analysis/webservice/apidocs/openapi.yml | 36 +++++
.../app_builders/HandlerArgsBuilder.py | 1 +
.../request/handlers/NexusRequestHandler.py | 22 ++-
.../webservice/webmodel/NexusExecutionResults.py | 149 +++++++++++++++++
docker/nexus-webapp/Dockerfile | 4 +-
12 files changed, 535 insertions(+), 94 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9f509a7..5e524f2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them.
- Added Saildrone's `baja_2018` insitu dataset.
+- SDAP-455: Large job tracking
### Changed
- SDAP-443:
- Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/webservice/algorithms/doms/ExecutionCancel.py b/analysis/webservice/algorithms/doms/ExecutionCancel.py
new file mode 100644
index 0000000..c9ef347
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionCancel.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
+from datetime import datetime
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(NexusCalcSparkTornadoHandler):
+ name = 'Execution Status Handler'
+ path = '/job/cancel'
+ description = ''
+ params = {}
+ singleton = True
+
+ def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
+ NexusCalcSparkTornadoHandler.__init__(
+ self,
+ algorithm_config=algorithm_config,
+ sc=sc,
+ tile_service_factory=tile_service_factory
+ )
+ self.tile_service_factory = tile_service_factory
+ self.config = config
+
+ def calc(self, request, tornado_io_loop, **args):
+ execution_id = request.get_argument('id', None)
+
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except ValueError:
+ raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+ with ResultsRetrieval(self.config) as retrieval:
+ try:
+ execution_details = retrieval.retrieveExecution(execution_id)
+ except ValueError:
+ raise NexusProcessingException(
+ reason=f'Execution {execution_id} not found',
+ code=404
+ )
+
+ job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+
+ # Only proceed if status is "running". Otherwise, noop
+ if job_status == ExecutionStatus.RUNNING:
+ # Cancel Spark job
+ self._sc.cancelJobGroup(str(execution_id))
+
+ # Update job status to "cancelled"
+ end = datetime.utcnow()
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ execution_id,
+ completeTime=end,
+ status=ExecutionStatus.CANCELLED.value,
+ message=None,
+ stats=None,
+ results=None
+ )
+
+ # Redirect to job status endpoint
+ request.requestHandler.redirect(f'/job?id={execution_id}')
diff --git a/analysis/webservice/algorithms/doms/ExecutionStatus.py b/analysis/webservice/algorithms/doms/ExecutionStatus.py
new file mode 100644
index 0000000..90dced8
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionStatus.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from . import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+ name = 'Execution Status Handler'
+ path = '/job'
+ description = ''
+ params = {}
+ singleton = True
+
+ def __init__(self, tile_service_factory, config=None):
+ BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+ self.config = config
+
+ def calc(self, request, **args):
+ execution_id = request.get_argument('id', None)
+
+ try:
+ execution_id = uuid.UUID(execution_id)
+ except ValueError:
+ raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+ # Check if the job is done
+ with ResultsRetrieval(self.config) as retrieval:
+ try:
+ execution_details = retrieval.retrieveExecution(execution_id)
+ execution_params = retrieval.retrieveParams(execution_id)
+ except ValueError:
+ raise NexusProcessingException(
+ reason=f'Execution {execution_id} not found',
+ code=404
+ )
+
+ # TODO check if job ID is valid. raise error w/meanginful error if not
+
+ job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+ host = f'{request.requestHandler.request.protocol}://{request.requestHandler.request.host}'
+
+ return NexusExecutionResults.NexusExecutionResults(
+ status=job_status,
+ created=execution_details['timeStarted'],
+ completed=execution_details['timeCompleted'],
+ execution_id=execution_id,
+ message=execution_details['message'],
+ params=execution_params,
+ host=host
+ )
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index c989286..7ca8374 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -98,24 +98,39 @@ class ResultsStorage(AbstractResultsContainer):
def __init__(self, config=None):
AbstractResultsContainer.__init__(self, config)
- def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
- self._log.info('Beginning results write')
+ def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+ """
+ Initial insert into database for CDMS matchup request. This
+ populates the execution and params table.
+ """
if isinstance(execution_id, str):
execution_id = uuid.UUID(execution_id)
- execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+ execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
self.__insertParams(execution_id, params)
- self.__insertStats(execution_id, stats)
- self.__insertResults(execution_id, results)
- self._log.info('Results write finished')
return execution_id
- def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+ def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+ self.__updateExecution(execution_id, completeTime, status, message)
+ if stats:
+ self.__insertStats(execution_id, stats)
+ if results:
+ self.__insertResults(execution_id, results)
+
+ def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+ """
+ Insert new entry into execution table
+ """
if execution_id is None:
execution_id = uuid.uuid4()
- cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
- self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+ cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+ self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+ return execution_id
+
+ def __updateExecution(self, execution_id, complete_time, status, message=None):
+ cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s"
+ self._session.execute(cql, (complete_time, status, message, execution_id))
return execution_id
def __insertParams(self, execution_id, params):
@@ -248,7 +263,7 @@ class ResultsRetrieval(AbstractResultsContainer):
if isinstance(execution_id, str):
execution_id = uuid.UUID(execution_id)
- params = self.__retrieveParams(execution_id)
+ params = self.retrieveParams(execution_id)
stats = self.__retrieveStats(execution_id)
data = self.__retrieveData(execution_id, trim_data=trim_data)
return params, stats, data
@@ -302,7 +317,9 @@ class ResultsRetrieval(AbstractResultsContainer):
"depth": row.depth
}
for key in row.measurement_values:
- value = float(row.measurement_values[key])
+ value = (row.measurement_values[key])
+ if value is not None:
+ value = float(value)
entry[key] = value
return entry
@@ -321,7 +338,7 @@ class ResultsRetrieval(AbstractResultsContainer):
raise Exception("Execution not found with id '%s'" % id)
- def __retrieveParams(self, id):
+ def retrieveParams(self, id):
cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
@@ -341,3 +358,23 @@ class ResultsRetrieval(AbstractResultsContainer):
return params
raise Exception("Execution not found with id '%s'" % id)
+
+ def retrieveExecution(self, execution_id):
+ """
+ Retrieve execution details from database.
+
+ :param execution_id: Execution ID
+ :return: execution status dictionary
+ """
+
+ cql = "SELECT * FROM doms_executions where id = %s limit 1"
+ rows = self._session.execute(cql, (execution_id,))
+ for row in rows:
+ return {
+ 'status': row.status,
+ 'message': row.message,
+ 'timeCompleted': row.time_completed,
+ 'timeStarted': row.time_started
+ }
+
+ raise ValueError('Execution not found with id %s', execution_id)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index 9580d38..bc568f8 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -15,6 +15,8 @@
from . import BaseDomsHandler
+from . import ExecutionStatus
+from . import ExecutionCancel
from . import DatasetListQuery
from . import DomsInitialization
from . import MatchupQuery
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 1274b64..fd998e6 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
+import uuid
from typing import Optional
import logging
import threading
@@ -30,16 +29,17 @@ from pytz import timezone, UTC
from scipy import spatial
from shapely import wkt
from shapely.geometry import box
+import functools
from webservice.NexusHandler import nexus_handler
-from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
from webservice.algorithms.doms import config as edge_endpoints
from webservice.algorithms.doms import values as doms_values
-from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
from webservice.algorithms.doms.ResultsStorage import ResultsStorage
from webservice.algorithms.doms.insitu import query_insitu as query_edge
from webservice.algorithms.doms.insitu import query_insitu_schema
from webservice.webmodel import NexusProcessingException
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
@@ -66,7 +66,7 @@ def iso_time_to_epoch(str_time):
@nexus_handler
-class Matchup(NexusCalcSparkHandler):
+class Matchup(NexusCalcSparkTornadoHandler):
name = "Matchup"
path = "/match_spark"
description = "Match measurements between two or more datasets"
@@ -147,7 +147,12 @@ class Matchup(NexusCalcSparkHandler):
singleton = True
def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
- NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
+ NexusCalcSparkTornadoHandler.__init__(
+ self,
+ algorithm_config=algorithm_config,
+ sc=sc,
+ tile_service_factory=tile_service_factory
+ )
self.log = logging.getLogger(__name__)
self.tile_service_factory = tile_service_factory
self.config = config
@@ -219,7 +224,66 @@ class Matchup(NexusCalcSparkHandler):
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit
- def calc(self, request, **args):
+ def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
+ secondary_ds_names, parameter_s, start_time, end_time, depth_min,
+ depth_max, time_tolerance, radius_tolerance, platforms, match_once,
+ result_size_limit, start):
+ # Call spark_matchup
+ self.log.debug("Calling Spark Driver")
+
+ try:
+ self._sc.setJobGroup(execution_id, execution_id)
+ spark_result = spark_matchup_driver(
+ tile_ids, wkt.dumps(bounding_polygon),
+ primary_ds_name,
+ secondary_ds_names,
+ parameter_s,
+ depth_min,
+ depth_max, time_tolerance,
+ radius_tolerance,
+ platforms,
+ match_once,
+ self.tile_service_factory,
+ sc=self._sc
+ )
+ except Exception as error:
+ self.log.exception(error)
+ end = datetime.utcnow()
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ uuid.UUID(execution_id),
+ completeTime=end,
+ status=ExecutionStatus.FAILED.value,
+ message=error,
+ stats=None,
+ results=None
+ )
+ return
+
+ self.log.debug("Building and saving results")
+ end = datetime.utcnow()
+
+ total_keys = len(list(spark_result.keys()))
+ total_values = sum(len(v) for v in spark_result.values())
+ details = {
+ "timeToComplete": int((end - start).total_seconds()),
+ "numSecondaryMatched": total_values,
+ "numPrimaryMatched": total_keys
+ }
+
+ matches = Matchup.convert_to_matches(spark_result)
+
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ uuid.UUID(execution_id),
+ completeTime=end,
+ status=ExecutionStatus.SUCCESS.value,
+ message=None,
+ stats=details,
+ results=matches
+ )
+
+ def calc(self, request, tornado_io_loop, **args):
start = datetime.utcnow()
# TODO Assuming Satellite primary
bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
@@ -227,33 +291,6 @@ class Matchup(NexusCalcSparkHandler):
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit = self.parse_arguments(request)
- with ResultsStorage(self.config) as resultsStorage:
-
- execution_id = str(resultsStorage.insertExecution(None, start, None, None))
-
- self.log.debug("Querying for tiles in search domain")
- # Get tile ids in box
- tile_ids = [tile.tile_id for tile in
- self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
- start_seconds_from_epoch, end_seconds_from_epoch,
- fetch_data=False, fl='id',
- sort=['tile_min_time_dt asc', 'tile_min_lon asc',
- 'tile_min_lat asc'], rows=5000)]
-
- self.log.info('Found %s tile_ids', len(tile_ids))
- # Call spark_matchup
- self.log.debug("Calling Spark Driver")
- try:
- spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
- secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
- radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
- except Exception as e:
- self.log.exception(e)
- raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
-
- end = datetime.utcnow()
-
- self.log.debug("Building and saving results")
args = {
"primary": primary_ds_name,
"matchup": secondary_ds_names,
@@ -272,35 +309,60 @@ class Matchup(NexusCalcSparkHandler):
if depth_max is not None:
args["depthMax"] = float(depth_max)
- total_keys = len(list(spark_result.keys()))
- total_values = sum(len(v) for v in spark_result.values())
- details = {
- "timeToComplete": int((end - start).total_seconds()),
- "numSecondaryMatched": total_values,
- "numPrimaryMatched": total_keys
- }
- matches = Matchup.convert_to_matches(spark_result)
+ with ResultsStorage(self.config) as resultsStorage:
+ execution_id = str(resultsStorage.insertInitialExecution(
+ params=args,
+ startTime=start,
+ status=ExecutionStatus.RUNNING.value
+ ))
- def do_result_insert():
- with ResultsStorage(self.config) as storage:
- storage.insertResults(results=matches, params=args, stats=details,
- startTime=start, completeTime=end, userEmail="",
- execution_id=execution_id)
+ self.log.debug("Querying for tiles in search domain")
+ # Get tile ids in box
+ tile_ids = [tile.tile_id for tile in
+ self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+ start_seconds_from_epoch, end_seconds_from_epoch,
+ fetch_data=False, fl='id',
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+ 'tile_min_lat asc'], rows=5000)]
- threading.Thread(target=do_result_insert).start()
+ self.log.info('Found %s tile_ids', len(tile_ids))
- # Get only the first "result_size_limit" results
- # '0' means returns everything
- if result_size_limit > 0:
- matches = matches[0:result_size_limit]
+ if not tile_ids:
+ # There are no matching tiles
+ end = datetime.utcnow()
+ with ResultsStorage(self.config) as storage:
+ storage.updateExecution(
+ uuid.UUID(execution_id),
+ completeTime=end,
+ status=ExecutionStatus.FAILED.value,
+ message='No tiles matched the provided domain'
+ )
- result = DomsQueryResults(results=matches, args=args,
- details=details, bounds=None,
- count=len(matches), computeOptions=None,
- executionId=execution_id)
+ # Start async processing with Spark. Do not wait for response
+ # before returning to user.
+ tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
+ self.async_calc,
+ execution_id=execution_id,
+ tile_ids=tile_ids,
+ bounding_polygon=bounding_polygon,
+ primary_ds_name=primary_ds_name,
+ secondary_ds_names=secondary_ds_names,
+ parameter_s=parameter_s,
+ start_time=start_time,
+ end_time=end_time,
+ depth_min=depth_min,
+ depth_max=depth_max,
+ time_tolerance=time_tolerance,
+ radius_tolerance=radius_tolerance,
+ platforms=platforms,
+ match_once=match_once,
+ result_size_limit=result_size_limit,
+ start=start
+ ))
+
+ request.requestHandler.redirect(f'/job?id={execution_id}')
- return result
@classmethod
def convert_to_matches(cls, spark_result):
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
similarity index 61%
copy from analysis/webservice/algorithms/doms/__init__.py
copy to analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
index 9580d38..c2a38c5 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
@@ -13,22 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
-from . import BaseDomsHandler
-from . import DatasetListQuery
-from . import DomsInitialization
-from . import MatchupQuery
-from . import MetadataQuery
-from . import ResultsPlotQuery
-from . import ResultsRetrieval
-from . import ResultsStorage
-from . import StatsQuery
-from . import ValuesQuery
-from . import config
-from . import datafetch
-from . import fetchedgeimpl
-from . import geo
-from . import insitusubset
-from . import subsetter
-from . import values
-from . import workerthread
+logger = logging.getLogger(__name__)
+
+
+class NexusCalcSparkTornadoHandler(NexusCalcSparkHandler):
+ pass
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 55802cd..3f3490b 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -646,6 +646,42 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/Error'
+ /job:
+ get:
+ summary: |
+ Get job status
+ operationId: job
+ tags:
+ - Analytics
+ description: "Get job status by execution id"
+ parameters:
+ - in: query
+ name: id
+ description: |
+ The job execution ID
+ required: true
+ schema:
+ type: string
+ format: uuid
+ example: c864a51b-3d87-4872-9070-632820b1cae2
+ /job/cancel:
+ get:
+ summary: |
+ Cancel running job
+ operationId: jobCancel
+ tags:
+ - Analytics
+ description: "Cancel running job"
+ parameters:
+ - in: query
+ name: id
+ description: |
+ The job execution ID
+ required: true
+ schema:
+ type: string
+ format: uuid
+ example: c864a51b-3d87-4872-9070-632820b1cae2
externalDocs:
description: Documentation
url: https://incubator-sdap-nexus.readthedocs.io/en/latest/index.html
diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
index fed08b9..2a84ae7 100644
--- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
@@ -37,6 +37,7 @@ class HandlerArgsBuilder:
class_wrapper == webservice.algorithms_spark.Matchup.Matchup
or class_wrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms
or issubclass(class_wrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler)
+ or issubclass(class_wrapper, webservice.algorithms_spark.NexusCalcSparkTornadoHandler.NexusCalcSparkTornadoHandler)
or class_wrapper == webservice.algorithms.doms.ResultsRetrieval.DomsResultsRetrievalHandler
or class_wrapper == webservice.algorithms.doms.ResultsPlotQuery.DomsResultsPlotHandler
)
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 2645574..95bddf4 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -20,6 +20,7 @@ import tornado.ioloop
from webservice.nexus_tornado.request.renderers import NexusRendererFactory
from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
class NexusRequestHandler(tornado.web.RequestHandler):
@@ -44,18 +45,29 @@ class NexusRequestHandler(tornado.web.RequestHandler):
# create NexusCalcHandler which will process the request
instance = self.__clazz(**self._clazz_init_args)
+ io_loop = tornado.ioloop.IOLoop.current()
+
try:
- # process the request asynchronously on a different thread,
- # the current tornado handler is still available to get other user requests
- results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+ if isinstance(instance, NexusCalcSparkTornadoHandler):
+ results = instance.calc(request, io_loop)
+ else:
+ results = yield io_loop.run_in_executor(
+ self.executor,
+ instance.calc,
+ request
+ )
try:
self.set_status(results.status_code)
except AttributeError:
pass
- renderer = NexusRendererFactory.get_renderer(request)
- renderer.render(self, results)
+ # Only render results if there are results to render.
+ # "NexusCalcSparkTornadoHandler" endpoints redirectm so no
+ # need to render.
+ if not isinstance(instance, NexusCalcSparkTornadoHandler):
+ renderer = NexusRendererFactory.get_renderer(request)
+ renderer.render(self, results)
except NexusProcessingException as e:
self.async_onerror_callback(e.reason, e.code)
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
new file mode 100644
index 0000000..736ba76
--- /dev/null
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+ RUNNING = 'running'
+ SUCCESS = 'success'
+ FAILED = 'failed'
+ CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+ return {
+ 'status': job_state.value,
+ 'message': message,
+ 'createdAt': created,
+ 'updatedAt': updated,
+ 'links': [{
+ 'href': f'{host}/job?id={execution_id}',
+ 'title': 'The current page',
+ 'type': 'application/json',
+ 'rel': 'self'
+ }],
+ 'params': params,
+ 'jobID': execution_id
+ }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+ job_body = construct_job_status(
+ status,
+ created,
+ completed,
+ execution_id,
+ params,
+ host
+ )
+
+ # Construct urls
+ formats = [
+ 'CSV',
+ 'JSON',
+ 'NETCDF'
+ ]
+ data_links = [{
+ 'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+ 'title': 'Download results',
+ 'rel': 'data'
+ } for output_format in formats]
+ job_body['links'].extend(data_links)
+ return job_body
+
+
+def construct_running(status, created, execution_id, params, host):
+ job_body = construct_job_status(
+ status,
+ created,
+ None,
+ execution_id,
+ params,
+ host
+ )
+ job_body['links'].append({
+ 'href': f'{host}/job/cancel?id={execution_id}',
+ 'title': 'Cancel the job',
+ 'rel': 'cancel'
+ })
+ return job_body
+
+
+def construct_error(status, created, completed, execution_id, message, params, host):
+ return construct_job_status(
+ status,
+ created,
+ completed,
+ execution_id,
+ params,
+ host,
+ message
+ )
+
+
+def construct_cancelled(status, created, completed, execution_id, params, host):
+ return construct_job_status(
+ status,
+ created,
+ completed,
+ execution_id,
+ params,
+ host
+ )
+
+
+class NexusExecutionResults:
+ def __init__(self, status=None, created=None, completed=None, execution_id=None, message='',
+ params=None, host=None, status_code=200):
+ self.status_code = status_code
+ self.status = status
+ self.created = created
+ self.completed = completed
+ self.execution_id = execution_id
+ self.message = message
+ self.execution_params = params
+ self.host = host
+
+ def toJson(self):
+ params = {
+ 'status': self.status,
+ 'created': self.created,
+ 'execution_id': self.execution_id,
+ 'params': self.execution_params,
+ 'host': self.host
+ }
+ if self.status == ExecutionStatus.SUCCESS:
+ params['completed'] = self.completed
+ construct = construct_done
+ elif self.status == ExecutionStatus.RUNNING:
+ construct = construct_running
+ elif self.status == ExecutionStatus.FAILED:
+ params['completed'] = self.completed
+ params['message'] = self.message
+ construct = construct_error
+ elif self.status == ExecutionStatus.CANCELLED:
+ params['completed'] = self.completed
+ construct = construct_cancelled
+ else:
+ # Raise error -- job state is invalid
+ raise ValueError('Unable to fetch status for execution {}', self.execution_id)
+
+ job_details = construct(**params)
+ return json.dumps(job_details, indent=4, default=str)
diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile
index 1caf462..0f53ce1 100644
--- a/docker/nexus-webapp/Dockerfile
+++ b/docker/nexus-webapp/Dockerfile
@@ -95,10 +95,10 @@ RUN python3 setup.py install clean && mamba clean -afy
WORKDIR /incubator-sdap-nexus/tools/deletebyquery
-RUN pip3 install cassandra-driver==3.20.1 --install-option="--no-cython"
+RUN pip3 install cython
+RUN pip3 install cassandra-driver==3.20.1
RUN pip3 install pyspark py4j
RUN pip3 install -r requirements.txt
-RUN pip3 install cython
RUN rm requirements.txt
WORKDIR /incubator-sdap-nexus