You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/05/10 00:47:07 UTC

[incubator-sdap-nexus] branch SDAP-455 created (now 8725ff6)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a change to branch SDAP-455
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


      at 8725ff6  Large job tracking

This branch includes the following new commits:

     new 8725ff6  Large job tracking

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sdap-nexus] 01/01: Large job tracking

Posted by sk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-455
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 8725ff6fc35c24436323b89edc5cf9b91901e886
Author: skorper <st...@gmail.com>
AuthorDate: Tue May 9 17:46:54 2023 -0700

    Large job tracking
---
 CHANGELOG.md                                       |   1 +
 .../webservice/algorithms/doms/ExecutionCancel.py  |  83 ++++++++++
 .../webservice/algorithms/doms/ExecutionStatus.py  |  69 ++++++++
 .../webservice/algorithms/doms/ResultsStorage.py   |  61 +++++--
 analysis/webservice/algorithms/doms/__init__.py    |   2 +
 analysis/webservice/algorithms_spark/Matchup.py    | 176 ++++++++++++++-------
 .../NexusCalcSparkTornadoHandler.py}               |  25 +--
 analysis/webservice/apidocs/openapi.yml            |  36 +++++
 .../app_builders/HandlerArgsBuilder.py             |   1 +
 .../request/handlers/NexusRequestHandler.py        |  22 ++-
 .../webservice/webmodel/NexusExecutionResults.py   | 149 +++++++++++++++++
 docker/nexus-webapp/Dockerfile                     |   4 +-
 12 files changed, 535 insertions(+), 94 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9f509a7..5e524f2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Added
 - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them.
 - Added Saildrone's `baja_2018` insitu dataset.
+- SDAP-455: Large job tracking
 ### Changed
 - SDAP-443:
   - Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/webservice/algorithms/doms/ExecutionCancel.py b/analysis/webservice/algorithms/doms/ExecutionCancel.py
new file mode 100644
index 0000000..c9ef347
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionCancel.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
+from datetime import datetime
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(NexusCalcSparkTornadoHandler):
+    name = 'Execution Status Handler'
+    path = '/job/cancel'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
+        NexusCalcSparkTornadoHandler.__init__(
+            self,
+            algorithm_config=algorithm_config,
+            sc=sc,
+            tile_service_factory=tile_service_factory
+        )
+        self.tile_service_factory = tile_service_factory
+        self.config = config
+
+    def calc(self, request, tornado_io_loop, **args):
+        execution_id = request.get_argument('id', None)
+
+        try:
+            execution_id = uuid.UUID(execution_id)
+        except ValueError:
+            raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+        with ResultsRetrieval(self.config) as retrieval:
+            try:
+                execution_details = retrieval.retrieveExecution(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'Execution {execution_id} not found',
+                    code=404
+                )
+
+        job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+
+        # Only proceed if status is "running". Otherwise, noop
+        if job_status == ExecutionStatus.RUNNING:
+            # Cancel Spark job
+            self._sc.cancelJobGroup(str(execution_id))
+
+            # Update job status to "cancelled"
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    execution_id,
+                    completeTime=end,
+                    status=ExecutionStatus.CANCELLED.value,
+                    message=None,
+                    stats=None,
+                    results=None
+                )
+
+        # Redirect to job status endpoint
+        request.requestHandler.redirect(f'/job?id={execution_id}')
diff --git a/analysis/webservice/algorithms/doms/ExecutionStatus.py b/analysis/webservice/algorithms/doms/ExecutionStatus.py
new file mode 100644
index 0000000..90dced8
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionStatus.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from . import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+    name = 'Execution Status Handler'
+    path = '/job'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, tile_service_factory, config=None):
+        BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+        self.config = config
+
+    def calc(self, request, **args):
+        execution_id = request.get_argument('id', None)
+
+        try:
+            execution_id = uuid.UUID(execution_id)
+        except ValueError:
+            raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+        # Check if the job is done
+        with ResultsRetrieval(self.config) as retrieval:
+            try:
+                execution_details = retrieval.retrieveExecution(execution_id)
+                execution_params = retrieval.retrieveParams(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'Execution {execution_id} not found',
+                    code=404
+                )
+
+        # TODO check if job ID is valid. raise error w/meanginful error if not
+
+        job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+        host = f'{request.requestHandler.request.protocol}://{request.requestHandler.request.host}'
+
+        return NexusExecutionResults.NexusExecutionResults(
+            status=job_status,
+            created=execution_details['timeStarted'],
+            completed=execution_details['timeCompleted'],
+            execution_id=execution_id,
+            message=execution_details['message'],
+            params=execution_params,
+            host=host
+        )
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index c989286..7ca8374 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -98,24 +98,39 @@ class ResultsStorage(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
 
-    def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
-        self._log.info('Beginning results write')
+    def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+        """
+        Initial insert into database for CDMS matchup request. This
+        populates the execution and params table.
+        """
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+        execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
         self.__insertParams(execution_id, params)
-        self.__insertStats(execution_id, stats)
-        self.__insertResults(execution_id, results)
-        self._log.info('Results write finished')
         return execution_id
 
-    def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+    def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+        self.__updateExecution(execution_id, completeTime, status, message)
+        if stats:
+            self.__insertStats(execution_id, stats)
+        if results:
+            self.__insertResults(execution_id, results)
+
+    def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+        """
+        Insert new entry into execution table
+        """
         if execution_id is None:
             execution_id = uuid.uuid4()
 
-        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
-        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+        return execution_id
+
+    def __updateExecution(self, execution_id, complete_time, status, message=None):
+        cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s"
+        self._session.execute(cql, (complete_time, status, message, execution_id))
         return execution_id
 
     def __insertParams(self, execution_id, params):
@@ -248,7 +263,7 @@ class ResultsRetrieval(AbstractResultsContainer):
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        params = self.__retrieveParams(execution_id)
+        params = self.retrieveParams(execution_id)
         stats = self.__retrieveStats(execution_id)
         data = self.__retrieveData(execution_id, trim_data=trim_data)
         return params, stats, data
@@ -302,7 +317,9 @@ class ResultsRetrieval(AbstractResultsContainer):
                 "depth": row.depth
             }
         for key in row.measurement_values:
-            value = float(row.measurement_values[key])
+            value = (row.measurement_values[key])
+            if value is not None:
+                value = float(value)
             entry[key] = value
         return entry
 
@@ -321,7 +338,7 @@ class ResultsRetrieval(AbstractResultsContainer):
 
         raise Exception("Execution not found with id '%s'" % id)
 
-    def __retrieveParams(self, id):
+    def retrieveParams(self, id):
         cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
         for row in rows:
@@ -341,3 +358,23 @@ class ResultsRetrieval(AbstractResultsContainer):
             return params
 
         raise Exception("Execution not found with id '%s'" % id)
+
+    def retrieveExecution(self, execution_id):
+        """
+        Retrieve execution details from database.
+
+        :param execution_id: Execution ID
+        :return: execution status dictionary
+        """
+
+        cql = "SELECT * FROM doms_executions where id = %s limit 1"
+        rows = self._session.execute(cql, (execution_id,))
+        for row in rows:
+            return {
+                'status': row.status,
+                'message': row.message,
+                'timeCompleted': row.time_completed,
+                'timeStarted': row.time_started
+            }
+
+        raise ValueError('Execution not found with id %s', execution_id)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index 9580d38..bc568f8 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -15,6 +15,8 @@
 
 
 from . import BaseDomsHandler
+from . import ExecutionStatus
+from . import ExecutionCancel
 from . import DatasetListQuery
 from . import DomsInitialization
 from . import MatchupQuery
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 1274b64..fd998e6 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+import uuid
 from typing import Optional
 import logging
 import threading
@@ -30,16 +29,17 @@ from pytz import timezone, UTC
 from scipy import spatial
 from shapely import wkt
 from shapely.geometry import box
+import functools
 
 from webservice.NexusHandler import nexus_handler
-from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
 from webservice.algorithms.doms import config as edge_endpoints
 from webservice.algorithms.doms import values as doms_values
-from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
 from webservice.algorithms.doms.ResultsStorage import ResultsStorage
 from webservice.algorithms.doms.insitu import query_insitu as query_edge
 from webservice.algorithms.doms.insitu import query_insitu_schema
 from webservice.webmodel import NexusProcessingException
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
@@ -66,7 +66,7 @@ def iso_time_to_epoch(str_time):
 
 
 @nexus_handler
-class Matchup(NexusCalcSparkHandler):
+class Matchup(NexusCalcSparkTornadoHandler):
     name = "Matchup"
     path = "/match_spark"
     description = "Match measurements between two or more datasets"
@@ -147,7 +147,12 @@ class Matchup(NexusCalcSparkHandler):
     singleton = True
 
     def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
-        NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
+        NexusCalcSparkTornadoHandler.__init__(
+            self,
+            algorithm_config=algorithm_config,
+            sc=sc,
+            tile_service_factory=tile_service_factory
+        )
         self.log = logging.getLogger(__name__)
         self.tile_service_factory = tile_service_factory
         self.config = config
@@ -219,7 +224,66 @@ class Matchup(NexusCalcSparkHandler):
                depth_min, depth_max, time_tolerance, radius_tolerance, \
                platforms, match_once, result_size_limit
 
-    def calc(self, request, **args):
+    def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
+                   secondary_ds_names, parameter_s, start_time, end_time, depth_min,
+                   depth_max, time_tolerance, radius_tolerance, platforms, match_once,
+                   result_size_limit, start):
+        # Call spark_matchup
+        self.log.debug("Calling Spark Driver")
+
+        try:
+            self._sc.setJobGroup(execution_id, execution_id)
+            spark_result = spark_matchup_driver(
+                tile_ids, wkt.dumps(bounding_polygon),
+                primary_ds_name,
+                secondary_ds_names,
+                parameter_s,
+                depth_min,
+                depth_max, time_tolerance,
+                radius_tolerance,
+                platforms,
+                match_once,
+                self.tile_service_factory,
+                sc=self._sc
+            )
+        except Exception as error:
+            self.log.exception(error)
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message=error,
+                    stats=None,
+                    results=None
+                )
+            return
+
+        self.log.debug("Building and saving results")
+        end = datetime.utcnow()
+
+        total_keys = len(list(spark_result.keys()))
+        total_values = sum(len(v) for v in spark_result.values())
+        details = {
+            "timeToComplete": int((end - start).total_seconds()),
+            "numSecondaryMatched": total_values,
+            "numPrimaryMatched": total_keys
+        }
+
+        matches = Matchup.convert_to_matches(spark_result)
+
+        with ResultsStorage(self.config) as storage:
+            storage.updateExecution(
+                uuid.UUID(execution_id),
+                completeTime=end,
+                status=ExecutionStatus.SUCCESS.value,
+                message=None,
+                stats=details,
+                results=matches
+            )
+
+    def calc(self, request, tornado_io_loop, **args):
         start = datetime.utcnow()
         # TODO Assuming Satellite primary
         bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
@@ -227,33 +291,6 @@ class Matchup(NexusCalcSparkHandler):
         depth_min, depth_max, time_tolerance, radius_tolerance, \
         platforms, match_once, result_size_limit = self.parse_arguments(request)
 
-        with ResultsStorage(self.config) as resultsStorage:
-
-            execution_id = str(resultsStorage.insertExecution(None, start, None, None))
-
-        self.log.debug("Querying for tiles in search domain")
-        # Get tile ids in box
-        tile_ids = [tile.tile_id for tile in
-                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
-                                                             start_seconds_from_epoch, end_seconds_from_epoch,
-                                                             fetch_data=False, fl='id',
-                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
-                                                                   'tile_min_lat asc'], rows=5000)]
-
-        self.log.info('Found %s tile_ids', len(tile_ids))
-        # Call spark_matchup
-        self.log.debug("Calling Spark Driver")
-        try:
-            spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
-                                                secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
-                                                radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
-        except Exception as e:
-            self.log.exception(e)
-            raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
-
-        end = datetime.utcnow()
-
-        self.log.debug("Building and saving results")
         args = {
             "primary": primary_ds_name,
             "matchup": secondary_ds_names,
@@ -272,35 +309,60 @@ class Matchup(NexusCalcSparkHandler):
         if depth_max is not None:
             args["depthMax"] = float(depth_max)
 
-        total_keys = len(list(spark_result.keys()))
-        total_values = sum(len(v) for v in spark_result.values())
-        details = {
-            "timeToComplete": int((end - start).total_seconds()),
-            "numSecondaryMatched": total_values,
-            "numPrimaryMatched": total_keys
-        }
 
-        matches = Matchup.convert_to_matches(spark_result)
+        with ResultsStorage(self.config) as resultsStorage:
+            execution_id = str(resultsStorage.insertInitialExecution(
+                params=args,
+                startTime=start,
+                status=ExecutionStatus.RUNNING.value
+            ))
 
-        def do_result_insert():
-            with ResultsStorage(self.config) as storage:
-                storage.insertResults(results=matches, params=args, stats=details,
-                                      startTime=start, completeTime=end, userEmail="",
-                                      execution_id=execution_id)
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
 
-        threading.Thread(target=do_result_insert).start()
+        self.log.info('Found %s tile_ids', len(tile_ids))
 
-        # Get only the first "result_size_limit" results
-        # '0' means returns everything
-        if result_size_limit > 0:
-            matches = matches[0:result_size_limit]
+        if not tile_ids:
+            # There are no matching tiles
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message='No tiles matched the provided domain'
+                )
 
-        result = DomsQueryResults(results=matches, args=args,
-                                  details=details, bounds=None,
-                                  count=len(matches), computeOptions=None,
-                                  executionId=execution_id)
+        # Start async processing with Spark. Do not wait for response
+        # before returning to user.
+        tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
+            self.async_calc,
+            execution_id=execution_id,
+            tile_ids=tile_ids,
+            bounding_polygon=bounding_polygon,
+            primary_ds_name=primary_ds_name,
+            secondary_ds_names=secondary_ds_names,
+            parameter_s=parameter_s,
+            start_time=start_time,
+            end_time=end_time,
+            depth_min=depth_min,
+            depth_max=depth_max,
+            time_tolerance=time_tolerance,
+            radius_tolerance=radius_tolerance,
+            platforms=platforms,
+            match_once=match_once,
+            result_size_limit=result_size_limit,
+            start=start
+        ))
+
+        request.requestHandler.redirect(f'/job?id={execution_id}')
 
-        return result
 
     @classmethod
     def convert_to_matches(cls, spark_result):
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
similarity index 61%
copy from analysis/webservice/algorithms/doms/__init__.py
copy to analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
index 9580d38..c2a38c5 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
@@ -13,22 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
 
-from . import BaseDomsHandler
-from . import DatasetListQuery
-from . import DomsInitialization
-from . import MatchupQuery
-from . import MetadataQuery
-from . import ResultsPlotQuery
-from . import ResultsRetrieval
-from . import ResultsStorage
-from . import StatsQuery
-from . import ValuesQuery
-from . import config
-from . import datafetch
-from . import fetchedgeimpl
-from . import geo
-from . import insitusubset
-from . import subsetter
-from . import values
-from . import workerthread
+logger = logging.getLogger(__name__)
+
+
+class NexusCalcSparkTornadoHandler(NexusCalcSparkHandler):
+    pass
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 55802cd..3f3490b 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -646,6 +646,42 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/Error'
+  /job:
+    get:
+      summary: |
+        Get job status
+      operationId: job
+      tags:
+        - Analytics
+      description: "Get job status by execution id"
+      parameters:
+        - in: query
+          name: id
+          description: |
+            The job execution ID
+          required: true
+          schema:
+            type: string
+            format: uuid
+          example: c864a51b-3d87-4872-9070-632820b1cae2
+  /job/cancel:
+    get:
+      summary: |
+        Cancel running job
+      operationId: jobCancel
+      tags:
+        - Analytics
+      description: "Cancel running job"
+      parameters:
+        - in: query
+          name: id
+          description: |
+            The job execution ID
+          required: true
+          schema:
+            type: string
+            format: uuid
+          example: c864a51b-3d87-4872-9070-632820b1cae2
 externalDocs:
   description: Documentation
   url: https://incubator-sdap-nexus.readthedocs.io/en/latest/index.html
diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
index fed08b9..2a84ae7 100644
--- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
@@ -37,6 +37,7 @@ class HandlerArgsBuilder:
                 class_wrapper == webservice.algorithms_spark.Matchup.Matchup
                 or class_wrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms
                 or issubclass(class_wrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler)
+                or issubclass(class_wrapper, webservice.algorithms_spark.NexusCalcSparkTornadoHandler.NexusCalcSparkTornadoHandler)
                 or class_wrapper == webservice.algorithms.doms.ResultsRetrieval.DomsResultsRetrievalHandler
                 or class_wrapper == webservice.algorithms.doms.ResultsPlotQuery.DomsResultsPlotHandler
         )
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 2645574..95bddf4 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -20,6 +20,7 @@ import tornado.ioloop
 
 from webservice.nexus_tornado.request.renderers import NexusRendererFactory
 from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
 
 
 class NexusRequestHandler(tornado.web.RequestHandler):
@@ -44,18 +45,29 @@ class NexusRequestHandler(tornado.web.RequestHandler):
         # create NexusCalcHandler which will process the request
         instance = self.__clazz(**self._clazz_init_args)
 
+        io_loop = tornado.ioloop.IOLoop.current()
+
         try:
-            # process the request asynchronously on a different thread,
-            # the current tornado handler is still available to get other user requests
-            results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+            if isinstance(instance, NexusCalcSparkTornadoHandler):
+                results = instance.calc(request, io_loop)
+            else:
+                results = yield io_loop.run_in_executor(
+                    self.executor,
+                    instance.calc,
+                    request
+                )
 
             try:
                 self.set_status(results.status_code)
             except AttributeError:
                 pass
 
-            renderer = NexusRendererFactory.get_renderer(request)
-            renderer.render(self, results)
+            # Only render results if there are results to render.
+            # "NexusCalcSparkTornadoHandler" endpoints redirectm so no
+            # need to render.
+            if not isinstance(instance, NexusCalcSparkTornadoHandler):
+                renderer = NexusRendererFactory.get_renderer(request)
+                renderer.render(self, results)
 
         except NexusProcessingException as e:
             self.async_onerror_callback(e.reason, e.code)
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
new file mode 100644
index 0000000..736ba76
--- /dev/null
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+    RUNNING = 'running'
+    SUCCESS = 'success'
+    FAILED = 'failed'
+    CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+    return {
+        'status': job_state.value,
+        'message': message,
+        'createdAt': created,
+        'updatedAt': updated,
+        'links': [{
+            'href': f'{host}/job?id={execution_id}',
+            'title': 'The current page',
+            'type': 'application/json',
+            'rel': 'self'
+        }],
+        'params': params,
+        'jobID': execution_id
+    }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+    # Construct urls
+    formats = [
+        'CSV',
+        'JSON',
+        'NETCDF'
+    ]
+    data_links = [{
+        'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+        'title': 'Download results',
+        'rel': 'data'
+    } for output_format in formats]
+    job_body['links'].extend(data_links)
+    return job_body
+
+
+def construct_running(status, created, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        None,
+        execution_id,
+        params,
+        host
+    )
+    job_body['links'].append({
+        'href': f'{host}/job/cancel?id={execution_id}',
+        'title': 'Cancel the job',
+        'rel': 'cancel'
+    })
+    return job_body
+
+
+def construct_error(status, created, completed, execution_id, message, params, host):
+    return construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host,
+        message
+    )
+
+
+def construct_cancelled(status, created, completed, execution_id, params, host):
+    return construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+
+class NexusExecutionResults:
+    def __init__(self, status=None, created=None, completed=None, execution_id=None, message='',
+                 params=None, host=None, status_code=200):
+        self.status_code = status_code
+        self.status = status
+        self.created = created
+        self.completed = completed
+        self.execution_id = execution_id
+        self.message = message
+        self.execution_params = params
+        self.host = host
+
+    def toJson(self):
+        params = {
+            'status': self.status,
+            'created': self.created,
+            'execution_id': self.execution_id,
+            'params': self.execution_params,
+            'host': self.host
+        }
+        if self.status == ExecutionStatus.SUCCESS:
+            params['completed'] = self.completed
+            construct = construct_done
+        elif self.status == ExecutionStatus.RUNNING:
+            construct = construct_running
+        elif self.status == ExecutionStatus.FAILED:
+            params['completed'] = self.completed
+            params['message'] = self.message
+            construct = construct_error
+        elif self.status == ExecutionStatus.CANCELLED:
+            params['completed'] = self.completed
+            construct = construct_cancelled
+        else:
+            # Raise error -- job state is invalid
+            raise ValueError('Unable to fetch status for execution {}', self.execution_id)
+
+        job_details = construct(**params)
+        return json.dumps(job_details, indent=4, default=str)
diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile
index 1caf462..0f53ce1 100644
--- a/docker/nexus-webapp/Dockerfile
+++ b/docker/nexus-webapp/Dockerfile
@@ -95,10 +95,10 @@ RUN python3 setup.py install clean && mamba clean -afy
 
 
 WORKDIR /incubator-sdap-nexus/tools/deletebyquery
-RUN pip3 install cassandra-driver==3.20.1 --install-option="--no-cython"
+RUN pip3 install cython
+RUN pip3 install cassandra-driver==3.20.1
 RUN pip3 install pyspark py4j
 RUN pip3 install -r requirements.txt
-RUN pip3 install cython
 RUN rm requirements.txt
 
 WORKDIR /incubator-sdap-nexus