You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/05/10 00:47:08 UTC

[incubator-sdap-nexus] 01/01: Large job tracking

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-455
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 8725ff6fc35c24436323b89edc5cf9b91901e886
Author: skorper <st...@gmail.com>
AuthorDate: Tue May 9 17:46:54 2023 -0700

    Large job tracking
---
 CHANGELOG.md                                       |   1 +
 .../webservice/algorithms/doms/ExecutionCancel.py  |  83 ++++++++++
 .../webservice/algorithms/doms/ExecutionStatus.py  |  69 ++++++++
 .../webservice/algorithms/doms/ResultsStorage.py   |  61 +++++--
 analysis/webservice/algorithms/doms/__init__.py    |   2 +
 analysis/webservice/algorithms_spark/Matchup.py    | 176 ++++++++++++++-------
 .../NexusCalcSparkTornadoHandler.py}               |  25 +--
 analysis/webservice/apidocs/openapi.yml            |  36 +++++
 .../app_builders/HandlerArgsBuilder.py             |   1 +
 .../request/handlers/NexusRequestHandler.py        |  22 ++-
 .../webservice/webmodel/NexusExecutionResults.py   | 149 +++++++++++++++++
 docker/nexus-webapp/Dockerfile                     |   4 +-
 12 files changed, 535 insertions(+), 94 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9f509a7..5e524f2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Added
 - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them.
 - Added Saildrone's `baja_2018` insitu dataset.
+- SDAP-455: Large job tracking
 ### Changed
 - SDAP-443:
   - Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/webservice/algorithms/doms/ExecutionCancel.py b/analysis/webservice/algorithms/doms/ExecutionCancel.py
new file mode 100644
index 0000000..c9ef347
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionCancel.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
+from datetime import datetime
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(NexusCalcSparkTornadoHandler):
+    name = 'Execution Status Handler'
+    path = '/job/cancel'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
+        NexusCalcSparkTornadoHandler.__init__(
+            self,
+            algorithm_config=algorithm_config,
+            sc=sc,
+            tile_service_factory=tile_service_factory
+        )
+        self.tile_service_factory = tile_service_factory
+        self.config = config
+
+    def calc(self, request, tornado_io_loop, **args):
+        execution_id = request.get_argument('id', None)
+
+        try:
+            execution_id = uuid.UUID(execution_id)
+        except ValueError:
+            raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+        with ResultsRetrieval(self.config) as retrieval:
+            try:
+                execution_details = retrieval.retrieveExecution(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'Execution {execution_id} not found',
+                    code=404
+                )
+
+        job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+
+        # Only proceed if status is "running". Otherwise, noop
+        if job_status == ExecutionStatus.RUNNING:
+            # Cancel Spark job
+            self._sc.cancelJobGroup(str(execution_id))
+
+            # Update job status to "cancelled"
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    execution_id,
+                    completeTime=end,
+                    status=ExecutionStatus.CANCELLED.value,
+                    message=None,
+                    stats=None,
+                    results=None
+                )
+
+        # Redirect to job status endpoint
+        request.requestHandler.redirect(f'/job?id={execution_id}')
diff --git a/analysis/webservice/algorithms/doms/ExecutionStatus.py b/analysis/webservice/algorithms/doms/ExecutionStatus.py
new file mode 100644
index 0000000..90dced8
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionStatus.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from . import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+    name = 'Execution Status Handler'
+    path = '/job'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, tile_service_factory, config=None):
+        BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+        self.config = config
+
+    def calc(self, request, **args):
+        execution_id = request.get_argument('id', None)
+
+        try:
+            execution_id = uuid.UUID(execution_id)
+        except ValueError:
+            raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+        # Check if the job is done
+        with ResultsRetrieval(self.config) as retrieval:
+            try:
+                execution_details = retrieval.retrieveExecution(execution_id)
+                execution_params = retrieval.retrieveParams(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'Execution {execution_id} not found',
+                    code=404
+                )
+
+        # TODO check if job ID is valid. raise error w/meanginful error if not
+
+        job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+        host = f'{request.requestHandler.request.protocol}://{request.requestHandler.request.host}'
+
+        return NexusExecutionResults.NexusExecutionResults(
+            status=job_status,
+            created=execution_details['timeStarted'],
+            completed=execution_details['timeCompleted'],
+            execution_id=execution_id,
+            message=execution_details['message'],
+            params=execution_params,
+            host=host
+        )
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index c989286..7ca8374 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -98,24 +98,39 @@ class ResultsStorage(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
 
-    def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
-        self._log.info('Beginning results write')
+    def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+        """
+        Initial insert into database for CDMS matchup request. This
+        populates the execution and params table.
+        """
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+        execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
         self.__insertParams(execution_id, params)
-        self.__insertStats(execution_id, stats)
-        self.__insertResults(execution_id, results)
-        self._log.info('Results write finished')
         return execution_id
 
-    def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+    def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+        self.__updateExecution(execution_id, completeTime, status, message)
+        if stats:
+            self.__insertStats(execution_id, stats)
+        if results:
+            self.__insertResults(execution_id, results)
+
+    def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+        """
+        Insert new entry into execution table
+        """
         if execution_id is None:
             execution_id = uuid.uuid4()
 
-        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
-        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+        return execution_id
+
+    def __updateExecution(self, execution_id, complete_time, status, message=None):
+        cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s"
+        self._session.execute(cql, (complete_time, status, message, execution_id))
         return execution_id
 
     def __insertParams(self, execution_id, params):
@@ -248,7 +263,7 @@ class ResultsRetrieval(AbstractResultsContainer):
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        params = self.__retrieveParams(execution_id)
+        params = self.retrieveParams(execution_id)
         stats = self.__retrieveStats(execution_id)
         data = self.__retrieveData(execution_id, trim_data=trim_data)
         return params, stats, data
@@ -302,7 +317,9 @@ class ResultsRetrieval(AbstractResultsContainer):
                 "depth": row.depth
             }
         for key in row.measurement_values:
-            value = float(row.measurement_values[key])
+            value = (row.measurement_values[key])
+            if value is not None:
+                value = float(value)
             entry[key] = value
         return entry
 
@@ -321,7 +338,7 @@ class ResultsRetrieval(AbstractResultsContainer):
 
         raise Exception("Execution not found with id '%s'" % id)
 
-    def __retrieveParams(self, id):
+    def retrieveParams(self, id):
         cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
         for row in rows:
@@ -341,3 +358,23 @@ class ResultsRetrieval(AbstractResultsContainer):
             return params
 
         raise Exception("Execution not found with id '%s'" % id)
+
+    def retrieveExecution(self, execution_id):
+        """
+        Retrieve execution details from database.
+
+        :param execution_id: Execution ID
+        :return: execution status dictionary
+        """
+
+        cql = "SELECT * FROM doms_executions where id = %s limit 1"
+        rows = self._session.execute(cql, (execution_id,))
+        for row in rows:
+            return {
+                'status': row.status,
+                'message': row.message,
+                'timeCompleted': row.time_completed,
+                'timeStarted': row.time_started
+            }
+
+        raise ValueError('Execution not found with id %s', execution_id)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index 9580d38..bc568f8 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -15,6 +15,8 @@
 
 
 from . import BaseDomsHandler
+from . import ExecutionStatus
+from . import ExecutionCancel
 from . import DatasetListQuery
 from . import DomsInitialization
 from . import MatchupQuery
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 1274b64..fd998e6 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+import uuid
 from typing import Optional
 import logging
 import threading
@@ -30,16 +29,17 @@ from pytz import timezone, UTC
 from scipy import spatial
 from shapely import wkt
 from shapely.geometry import box
+import functools
 
 from webservice.NexusHandler import nexus_handler
-from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
 from webservice.algorithms.doms import config as edge_endpoints
 from webservice.algorithms.doms import values as doms_values
-from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
 from webservice.algorithms.doms.ResultsStorage import ResultsStorage
 from webservice.algorithms.doms.insitu import query_insitu as query_edge
 from webservice.algorithms.doms.insitu import query_insitu_schema
 from webservice.webmodel import NexusProcessingException
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
@@ -66,7 +66,7 @@ def iso_time_to_epoch(str_time):
 
 
 @nexus_handler
-class Matchup(NexusCalcSparkHandler):
+class Matchup(NexusCalcSparkTornadoHandler):
     name = "Matchup"
     path = "/match_spark"
     description = "Match measurements between two or more datasets"
@@ -147,7 +147,12 @@ class Matchup(NexusCalcSparkHandler):
     singleton = True
 
     def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
-        NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
+        NexusCalcSparkTornadoHandler.__init__(
+            self,
+            algorithm_config=algorithm_config,
+            sc=sc,
+            tile_service_factory=tile_service_factory
+        )
         self.log = logging.getLogger(__name__)
         self.tile_service_factory = tile_service_factory
         self.config = config
@@ -219,7 +224,66 @@ class Matchup(NexusCalcSparkHandler):
                depth_min, depth_max, time_tolerance, radius_tolerance, \
                platforms, match_once, result_size_limit
 
-    def calc(self, request, **args):
+    def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
+                   secondary_ds_names, parameter_s, start_time, end_time, depth_min,
+                   depth_max, time_tolerance, radius_tolerance, platforms, match_once,
+                   result_size_limit, start):
+        # Call spark_matchup
+        self.log.debug("Calling Spark Driver")
+
+        try:
+            self._sc.setJobGroup(execution_id, execution_id)
+            spark_result = spark_matchup_driver(
+                tile_ids, wkt.dumps(bounding_polygon),
+                primary_ds_name,
+                secondary_ds_names,
+                parameter_s,
+                depth_min,
+                depth_max, time_tolerance,
+                radius_tolerance,
+                platforms,
+                match_once,
+                self.tile_service_factory,
+                sc=self._sc
+            )
+        except Exception as error:
+            self.log.exception(error)
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message=error,
+                    stats=None,
+                    results=None
+                )
+            return
+
+        self.log.debug("Building and saving results")
+        end = datetime.utcnow()
+
+        total_keys = len(list(spark_result.keys()))
+        total_values = sum(len(v) for v in spark_result.values())
+        details = {
+            "timeToComplete": int((end - start).total_seconds()),
+            "numSecondaryMatched": total_values,
+            "numPrimaryMatched": total_keys
+        }
+
+        matches = Matchup.convert_to_matches(spark_result)
+
+        with ResultsStorage(self.config) as storage:
+            storage.updateExecution(
+                uuid.UUID(execution_id),
+                completeTime=end,
+                status=ExecutionStatus.SUCCESS.value,
+                message=None,
+                stats=details,
+                results=matches
+            )
+
+    def calc(self, request, tornado_io_loop, **args):
         start = datetime.utcnow()
         # TODO Assuming Satellite primary
         bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
@@ -227,33 +291,6 @@ class Matchup(NexusCalcSparkHandler):
         depth_min, depth_max, time_tolerance, radius_tolerance, \
         platforms, match_once, result_size_limit = self.parse_arguments(request)
 
-        with ResultsStorage(self.config) as resultsStorage:
-
-            execution_id = str(resultsStorage.insertExecution(None, start, None, None))
-
-        self.log.debug("Querying for tiles in search domain")
-        # Get tile ids in box
-        tile_ids = [tile.tile_id for tile in
-                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
-                                                             start_seconds_from_epoch, end_seconds_from_epoch,
-                                                             fetch_data=False, fl='id',
-                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
-                                                                   'tile_min_lat asc'], rows=5000)]
-
-        self.log.info('Found %s tile_ids', len(tile_ids))
-        # Call spark_matchup
-        self.log.debug("Calling Spark Driver")
-        try:
-            spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
-                                                secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
-                                                radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
-        except Exception as e:
-            self.log.exception(e)
-            raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
-
-        end = datetime.utcnow()
-
-        self.log.debug("Building and saving results")
         args = {
             "primary": primary_ds_name,
             "matchup": secondary_ds_names,
@@ -272,35 +309,60 @@ class Matchup(NexusCalcSparkHandler):
         if depth_max is not None:
             args["depthMax"] = float(depth_max)
 
-        total_keys = len(list(spark_result.keys()))
-        total_values = sum(len(v) for v in spark_result.values())
-        details = {
-            "timeToComplete": int((end - start).total_seconds()),
-            "numSecondaryMatched": total_values,
-            "numPrimaryMatched": total_keys
-        }
 
-        matches = Matchup.convert_to_matches(spark_result)
+        with ResultsStorage(self.config) as resultsStorage:
+            execution_id = str(resultsStorage.insertInitialExecution(
+                params=args,
+                startTime=start,
+                status=ExecutionStatus.RUNNING.value
+            ))
 
-        def do_result_insert():
-            with ResultsStorage(self.config) as storage:
-                storage.insertResults(results=matches, params=args, stats=details,
-                                      startTime=start, completeTime=end, userEmail="",
-                                      execution_id=execution_id)
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
 
-        threading.Thread(target=do_result_insert).start()
+        self.log.info('Found %s tile_ids', len(tile_ids))
 
-        # Get only the first "result_size_limit" results
-        # '0' means returns everything
-        if result_size_limit > 0:
-            matches = matches[0:result_size_limit]
+        if not tile_ids:
+            # There are no matching tiles
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message='No tiles matched the provided domain'
+                )
 
-        result = DomsQueryResults(results=matches, args=args,
-                                  details=details, bounds=None,
-                                  count=len(matches), computeOptions=None,
-                                  executionId=execution_id)
+        # Start async processing with Spark. Do not wait for response
+        # before returning to user.
+        tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
+            self.async_calc,
+            execution_id=execution_id,
+            tile_ids=tile_ids,
+            bounding_polygon=bounding_polygon,
+            primary_ds_name=primary_ds_name,
+            secondary_ds_names=secondary_ds_names,
+            parameter_s=parameter_s,
+            start_time=start_time,
+            end_time=end_time,
+            depth_min=depth_min,
+            depth_max=depth_max,
+            time_tolerance=time_tolerance,
+            radius_tolerance=radius_tolerance,
+            platforms=platforms,
+            match_once=match_once,
+            result_size_limit=result_size_limit,
+            start=start
+        ))
+
+        request.requestHandler.redirect(f'/job?id={execution_id}')
 
-        return result
 
     @classmethod
     def convert_to_matches(cls, spark_result):
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
similarity index 61%
copy from analysis/webservice/algorithms/doms/__init__.py
copy to analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
index 9580d38..c2a38c5 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
@@ -13,22 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
 
-from . import BaseDomsHandler
-from . import DatasetListQuery
-from . import DomsInitialization
-from . import MatchupQuery
-from . import MetadataQuery
-from . import ResultsPlotQuery
-from . import ResultsRetrieval
-from . import ResultsStorage
-from . import StatsQuery
-from . import ValuesQuery
-from . import config
-from . import datafetch
-from . import fetchedgeimpl
-from . import geo
-from . import insitusubset
-from . import subsetter
-from . import values
-from . import workerthread
+logger = logging.getLogger(__name__)
+
+
+class NexusCalcSparkTornadoHandler(NexusCalcSparkHandler):
+    pass
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 55802cd..3f3490b 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -646,6 +646,42 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/Error'
+  /job:
+    get:
+      summary: |
+        Get job status
+      operationId: job
+      tags:
+        - Analytics
+      description: "Get job status by execution id"
+      parameters:
+        - in: query
+          name: id
+          description: |
+            The job execution ID
+          required: true
+          schema:
+            type: string
+            format: uuid
+          example: c864a51b-3d87-4872-9070-632820b1cae2
+  /job/cancel:
+    get:
+      summary: |
+        Cancel running job
+      operationId: jobCancel
+      tags:
+        - Analytics
+      description: "Cancel running job"
+      parameters:
+        - in: query
+          name: id
+          description: |
+            The job execution ID
+          required: true
+          schema:
+            type: string
+            format: uuid
+          example: c864a51b-3d87-4872-9070-632820b1cae2
 externalDocs:
   description: Documentation
   url: https://incubator-sdap-nexus.readthedocs.io/en/latest/index.html
diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
index fed08b9..2a84ae7 100644
--- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
@@ -37,6 +37,7 @@ class HandlerArgsBuilder:
                 class_wrapper == webservice.algorithms_spark.Matchup.Matchup
                 or class_wrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms
                 or issubclass(class_wrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler)
+                or issubclass(class_wrapper, webservice.algorithms_spark.NexusCalcSparkTornadoHandler.NexusCalcSparkTornadoHandler)
                 or class_wrapper == webservice.algorithms.doms.ResultsRetrieval.DomsResultsRetrievalHandler
                 or class_wrapper == webservice.algorithms.doms.ResultsPlotQuery.DomsResultsPlotHandler
         )
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 2645574..95bddf4 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -20,6 +20,7 @@ import tornado.ioloop
 
 from webservice.nexus_tornado.request.renderers import NexusRendererFactory
 from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
 
 
 class NexusRequestHandler(tornado.web.RequestHandler):
@@ -44,18 +45,29 @@ class NexusRequestHandler(tornado.web.RequestHandler):
         # create NexusCalcHandler which will process the request
         instance = self.__clazz(**self._clazz_init_args)
 
+        io_loop = tornado.ioloop.IOLoop.current()
+
         try:
-            # process the request asynchronously on a different thread,
-            # the current tornado handler is still available to get other user requests
-            results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+            if isinstance(instance, NexusCalcSparkTornadoHandler):
+                results = instance.calc(request, io_loop)
+            else:
+                results = yield io_loop.run_in_executor(
+                    self.executor,
+                    instance.calc,
+                    request
+                )
 
             try:
                 self.set_status(results.status_code)
             except AttributeError:
                 pass
 
-            renderer = NexusRendererFactory.get_renderer(request)
-            renderer.render(self, results)
+            # Only render results if there are results to render.
+            # "NexusCalcSparkTornadoHandler" endpoints redirectm so no
+            # need to render.
+            if not isinstance(instance, NexusCalcSparkTornadoHandler):
+                renderer = NexusRendererFactory.get_renderer(request)
+                renderer.render(self, results)
 
         except NexusProcessingException as e:
             self.async_onerror_callback(e.reason, e.code)
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
new file mode 100644
index 0000000..736ba76
--- /dev/null
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+    RUNNING = 'running'
+    SUCCESS = 'success'
+    FAILED = 'failed'
+    CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+    return {
+        'status': job_state.value,
+        'message': message,
+        'createdAt': created,
+        'updatedAt': updated,
+        'links': [{
+            'href': f'{host}/job?id={execution_id}',
+            'title': 'The current page',
+            'type': 'application/json',
+            'rel': 'self'
+        }],
+        'params': params,
+        'jobID': execution_id
+    }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+    # Construct urls
+    formats = [
+        'CSV',
+        'JSON',
+        'NETCDF'
+    ]
+    data_links = [{
+        'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+        'title': 'Download results',
+        'rel': 'data'
+    } for output_format in formats]
+    job_body['links'].extend(data_links)
+    return job_body
+
+
+def construct_running(status, created, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        None,
+        execution_id,
+        params,
+        host
+    )
+    job_body['links'].append({
+        'href': f'{host}/job/cancel?id={execution_id}',
+        'title': 'Cancel the job',
+        'rel': 'cancel'
+    })
+    return job_body
+
+
+def construct_error(status, created, completed, execution_id, message, params, host):
+    return construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host,
+        message
+    )
+
+
+def construct_cancelled(status, created, completed, execution_id, params, host):
+    return construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+
+class NexusExecutionResults:
+    def __init__(self, status=None, created=None, completed=None, execution_id=None, message='',
+                 params=None, host=None, status_code=200):
+        self.status_code = status_code
+        self.status = status
+        self.created = created
+        self.completed = completed
+        self.execution_id = execution_id
+        self.message = message
+        self.execution_params = params
+        self.host = host
+
+    def toJson(self):
+        params = {
+            'status': self.status,
+            'created': self.created,
+            'execution_id': self.execution_id,
+            'params': self.execution_params,
+            'host': self.host
+        }
+        if self.status == ExecutionStatus.SUCCESS:
+            params['completed'] = self.completed
+            construct = construct_done
+        elif self.status == ExecutionStatus.RUNNING:
+            construct = construct_running
+        elif self.status == ExecutionStatus.FAILED:
+            params['completed'] = self.completed
+            params['message'] = self.message
+            construct = construct_error
+        elif self.status == ExecutionStatus.CANCELLED:
+            params['completed'] = self.completed
+            construct = construct_cancelled
+        else:
+            # Raise error -- job state is invalid
+            raise ValueError('Unable to fetch status for execution {}', self.execution_id)
+
+        job_details = construct(**params)
+        return json.dumps(job_details, indent=4, default=str)
diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile
index 1caf462..0f53ce1 100644
--- a/docker/nexus-webapp/Dockerfile
+++ b/docker/nexus-webapp/Dockerfile
@@ -95,10 +95,10 @@ RUN python3 setup.py install clean && mamba clean -afy
 
 
 WORKDIR /incubator-sdap-nexus/tools/deletebyquery
-RUN pip3 install cassandra-driver==3.20.1 --install-option="--no-cython"
+RUN pip3 install cython
+RUN pip3 install cassandra-driver==3.20.1
 RUN pip3 install pyspark py4j
 RUN pip3 install -r requirements.txt
-RUN pip3 install cython
 RUN rm requirements.txt
 
 WORKDIR /incubator-sdap-nexus