You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/08/21 17:49:34 UTC

[incubator-sdap-nexus] branch master updated: SDAP-455: Large Job Tracking (#249)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 3071f18  SDAP-455: Large Job Tracking (#249)
3071f18 is described below

commit 3071f18acc9afd82b8d82cc8de28876e726ea963
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Mon Aug 21 10:49:28 2023 -0700

    SDAP-455: Large Job Tracking (#249)
    
    * Large job tracking
    
    * updateExecution function updated so execution status is not changed to success until after all results are inserted
    
    * cast exception to string when submitted job failure to db
    
    * Fix bug where NetCDF is not generated for sat to sat matches:
    
    * fix typo
    
    * pin numpy version due to python 3.9+ req
    
    * remove invalid comment
    
    * Only update execution status if job is in running state
    
    * reorder cancel operation. First update DB, then cancel spark job group
    
    * SDAP-467: Pagination (#261)
    
    * pagination
    
    * update openapi spec
    
    * Fix for cdmslist bug in pagination PR (#263)
    
    * Fix for cdmslist bug
    
    * Fix for custom results type for cdmslist query
    
    ---------
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
    
    * SDAP-473: Job priorization (#262)
    
    * Update changelog
    
    * job prioritization
    
    * fix typo
    
    * Add check for null __details
    
    * remove unnecessary print statement
    
    ---------
    
    Co-authored-by: Riley Kuttruff <72...@users.noreply.github.com>
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
    
    * added type to execution details results links
    
    * added status field to doms table initializer
    
    * fix prioritize distance
    
    * fixed bug where no primary tile matches resulted in error
    
    ---------
    
    Co-authored-by: Riley Kuttruff <72...@users.noreply.github.com>
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 CHANGELOG.md                                       |   5 +-
 analysis/setup.py                                  |   3 +-
 .../webservice/algorithms/doms/BaseDomsHandler.py  |  19 +-
 .../webservice/algorithms/doms/DatasetListQuery.py |  14 +-
 .../algorithms/doms/DomsInitialization.py          |   3 +-
 .../webservice/algorithms/doms/ExecutionCancel.py  |  83 +++++++++
 .../webservice/algorithms/doms/ExecutionStatus.py  |  67 +++++++
 .../webservice/algorithms/doms/ResultsRetrieval.py |   6 +-
 .../webservice/algorithms/doms/ResultsStorage.py   |  78 ++++++---
 analysis/webservice/algorithms/doms/__init__.py    |   2 +
 analysis/webservice/algorithms_spark/Matchup.py    | 193 ++++++++++++++-------
 .../NexusCalcSparkTornadoHandler.py                |  15 +-
 analysis/webservice/apidocs/openapi.yml            |  52 ++++++
 analysis/webservice/config/scheduler.xml           |  10 ++
 .../app_builders/HandlerArgsBuilder.py             |   1 +
 .../app_builders/SparkContextBuilder.py            |   9 +-
 .../request/handlers/NexusRequestHandler.py        |  22 ++-
 .../webservice/webmodel/NexusExecutionResults.py   | 150 ++++++++++++++++
 data-access/requirements.txt                       |   2 +
 19 files changed, 630 insertions(+), 104 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 147dac9..14b2576 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,9 +4,11 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
-## Unreleased
+## [Unreleased]
 ### Added
+- SDAP-467: Added pagination to cdmsresults endpoint
 - SDAP-461: Added 4 remaining Saildrone insitu datasets.
+- SDAP-473: Added support for matchup job prioritization
 ### Changed
 - SDAP-453: Updated results storage and retrieval to support output JSON from `/cdmsresults` that matches output from `/match_spark`.
   - **NOTE:** Deploying these changes to an existing SDAP deployment will require modifying the Cassandra database with stored results. There is a script to do so at `/tools/update-doms-data-schema/update.py`
@@ -14,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
     - Made the `output` parameter case-insensitive
     - Improved speed of results insert
     - Updated `id` field of insitu points to include depth. This solves an issue with NetCDF result rendering where different insitu observations at the same place & time but at different depths were being excluded for having the same `id`.
+- SDAP-455: Large job tracking
 - SDAP-466: Matchup now defines secondary `platform` fields with `platform.id` if it is available and not blank. It then uses `platform.code` and `platform.type` as fallbacks, then just the value of `platform` if none work.
 - SDAP-468: Updated matchup output filename
 - SDAP-482: Updated Saildrone in situ endpoint in config file
diff --git a/analysis/setup.py b/analysis/setup.py
index 99cd707..8fbc617 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -60,7 +60,8 @@ setuptools.setup(
             'config/algorithms.ini',
             'apidocs/index.html',
             'apidocs/openapi.yml',
-            'apidocs/dataset-populate.js'
+            'apidocs/dataset-populate.js',
+            'config/scheduler.xml'
         ],
         'webservice.algorithms.doms': ['domsconfig.ini.default'],
     },
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index af2e6d3..84c9163 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -85,7 +85,7 @@ class DomsEncoder(json.JSONEncoder):
 
 class DomsQueryResults(NexusResults):
     def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None,
-                 executionId=None, status_code=200):
+                 executionId=None, status_code=200, page_num=None, page_size=None):
         NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions,
                               status_code=status_code)
         self.__args = args
@@ -94,6 +94,13 @@ class DomsQueryResults(NexusResults):
         self.__details = details
         self.__executionId = str(executionId)
 
+        if self.__details is None:
+            self.__details = {}
+
+        # Add page num and size to details block
+        self.__details['pageNum'] = page_num
+        self.__details['pageSize'] = page_size
+
     def toJson(self):
         bounds = self.__bounds.toMap() if self.__bounds is not None else {}
         return json.dumps(
@@ -279,6 +286,9 @@ class DomsCSVFormatter:
             {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
 
             {"Global Attribute": "URI_Matchup", "Value": "https://doms.jpl.nasa.gov/domsresults?id=" + executionId + "&output=CSV"}, # TODO how to replace with actual req URL
+
+            {"Global Attribute": "CDMS_page_num", "Value": details["pageNum"]},
+            {"Global Attribute": "CDMS_page_size", "Value": details["pageSize"]},
         ]
 
         writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
@@ -329,6 +339,8 @@ class DomsNetCDFFormatter:
         dataset.CDMS_primary = params["primary"]
         dataset.CDMS_time_to_complete = details["timeToComplete"]
         dataset.CDMS_time_to_complete_units = "seconds"
+        dataset.CDMS_page_num = details["pageNum"]
+        dataset.CDMS_page_size = details["pageSize"]
 
         insituDatasets = params["matchup"]
         insituLinks = set()
@@ -408,7 +420,10 @@ class DomsNetCDFFormatter:
 
             # Add each match only if it is not already in the array of in situ points
             for match in result["matches"]:
-                key = (match['id'], f'{match["depth"]:.4}')
+                depth_str = ''
+                if match['depth'] is not None:
+                    depth_str = f'{match["depth"]:.4}'
+                key = (match['id'], depth_str)
 
                 if key not in ids:
                     ids[key] = insituIndex
diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py
index 78ba6ab..7c40d18 100644
--- a/analysis/webservice/algorithms/doms/DatasetListQuery.py
+++ b/analysis/webservice/algorithms/doms/DatasetListQuery.py
@@ -23,7 +23,7 @@ from . import config
 from . import values
 from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
 from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import cached
+from webservice.webmodel import cached, NexusResults
 
 
 @nexus_handler
@@ -120,4 +120,14 @@ class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
             "insitu": insituList
         }
 
-        return BaseDomsHandler.DomsQueryResults(results=values)
+        return DatasetListResults(results=values)
+
+
+class DatasetListResults(NexusResults):
+    def __init__(self, results=None):
+        NexusResults.__init__(self, results=results)
+
+        self.__results = results
+
+    def toJson(self):
+        return json.dumps({'data': self.__results}, indent=4)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index b695c9e..ed7db20 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -120,7 +120,8 @@ class DomsInitializer:
               id uuid PRIMARY KEY,
               time_started timestamp,
               time_completed timestamp,
-              user_email text
+              user_email text,
+              status text
             );
                 """
         session.execute(cql)
diff --git a/analysis/webservice/algorithms/doms/ExecutionCancel.py b/analysis/webservice/algorithms/doms/ExecutionCancel.py
new file mode 100644
index 0000000..466428c
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionCancel.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
+from datetime import datetime
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(NexusCalcSparkTornadoHandler):
+    name = 'Execution Status Handler'
+    path = '/job/cancel'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
+        NexusCalcSparkTornadoHandler.__init__(
+            self,
+            algorithm_config=algorithm_config,
+            sc=sc,
+            tile_service_factory=tile_service_factory
+        )
+        self.tile_service_factory = tile_service_factory
+        self.config = config
+
+    def calc(self, request, tornado_io_loop, **args):
+        execution_id = request.get_argument('id', None)
+
+        try:
+            execution_id = uuid.UUID(execution_id)
+        except ValueError:
+            raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+        with ResultsRetrieval(self.config) as retrieval:
+            try:
+                execution_details = retrieval.retrieveExecution(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'Execution {execution_id} not found',
+                    code=404
+                )
+
+        job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+
+        # Only proceed if status is "running". Otherwise, noop
+        if job_status == ExecutionStatus.RUNNING:
+            # Update job status to "cancelled"
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    execution_id,
+                    completeTime=end,
+                    status=ExecutionStatus.CANCELLED.value,
+                    message=None,
+                    stats=None,
+                    results=None
+                )
+
+            # Cancel Spark job
+            self._sc.cancelJobGroup(str(execution_id))
+
+        # Redirect to job status endpoint
+        request.requestHandler.redirect(f'/job?id={execution_id}')
diff --git a/analysis/webservice/algorithms/doms/ExecutionStatus.py b/analysis/webservice/algorithms/doms/ExecutionStatus.py
new file mode 100644
index 0000000..1bae455
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/ExecutionStatus.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+from . import BaseDomsHandler
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval
+from webservice.webmodel import NexusExecutionResults
+from webservice.webmodel import NexusProcessingException
+
+
+@nexus_handler
+class ExecutionStatusHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
+    name = 'Execution Status Handler'
+    path = '/job'
+    description = ''
+    params = {}
+    singleton = True
+
+    def __init__(self, tile_service_factory, config=None):
+        BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory)
+        self.config = config
+
+    def calc(self, request, **args):
+        execution_id = request.get_argument('id', None)
+
+        try:
+            execution_id = uuid.UUID(execution_id)
+        except ValueError:
+            raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400)
+
+        # Check if the job is done
+        with ResultsRetrieval(self.config) as retrieval:
+            try:
+                execution_details = retrieval.retrieveExecution(execution_id)
+                execution_params = retrieval.retrieveParams(execution_id)
+            except ValueError:
+                raise NexusProcessingException(
+                    reason=f'Execution {execution_id} not found',
+                    code=404
+                )
+
+        job_status = NexusExecutionResults.ExecutionStatus(execution_details['status'])
+        host = f'{request.requestHandler.request.protocol}://{request.requestHandler.request.host}'
+
+        return NexusExecutionResults.NexusExecutionResults(
+            status=job_status,
+            created=execution_details['timeStarted'],
+            completed=execution_details['timeCompleted'],
+            execution_id=execution_id,
+            message=execution_details['message'],
+            params=execution_params,
+            host=host
+        )
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
index c3b95b0..f03c1ca 100644
--- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py
+++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
@@ -35,6 +35,8 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
 
     def calc(self, computeOptions, **args):
         execution_id = computeOptions.get_argument("id", None)
+        page_num = computeOptions.get_int_arg('pageNum', default=1)
+        page_size = computeOptions.get_int_arg('pageSize', default=1000)
 
         try:
             execution_id = uuid.UUID(execution_id)
@@ -44,7 +46,7 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
         simple_results = computeOptions.get_boolean_arg("simpleResults", default=False)
 
         with ResultsStorage.ResultsRetrieval(self.config) as storage:
-            params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
+            params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results, page_num=page_num, page_size=page_size)
 
         return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=len(data),
-                                                computeOptions=None, executionId=execution_id)
+                                                computeOptions=None, executionId=execution_id, page_num=page_num, page_size=page_size)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index d0ead7e..6b210c0 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -26,7 +26,7 @@ import pkg_resources
 from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster
 from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
-from cassandra.query import BatchStatement
+from cassandra.query import BatchStatement, SimpleStatement
 from pytz import UTC
 from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
 from webservice.webmodel import NexusProcessingException
@@ -106,24 +106,40 @@ class ResultsStorage(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
 
-    def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
-        self._log.info('Beginning results write')
+    def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+        """
+        Initial insert into database for CDMS matchup request. This
+        populates the execution and params table.
+        """
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+        execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
         self.__insertParams(execution_id, params)
-        self.__insertStats(execution_id, stats)
-        self.__insertResults(execution_id, results)
-        self._log.info('Results write finished')
         return execution_id
 
-    def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+    def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+        if stats:
+            self.__insertStats(execution_id, stats)
+        if results:
+            self.__insertResults(execution_id, results)
+        self.__updateExecution(execution_id, completeTime, status, message)
+
+    def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+        """
+        Insert new entry into execution table
+        """
         if execution_id is None:
             execution_id = uuid.uuid4()
 
-        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
-        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+        return execution_id
+
+    def __updateExecution(self, execution_id, complete_time, status, message=None):
+        # Only update the status if it's "running". Any other state is immutable.
+        cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s IF status = 'running'"
+        self._session.execute(cql, (complete_time, status, message, execution_id))
         return execution_id
 
     def __insertParams(self, execution_id, params):
@@ -259,17 +275,17 @@ class ResultsRetrieval(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
 
-    def retrieveResults(self, execution_id, trim_data=False):
+    def retrieveResults(self, execution_id, trim_data=False, page_num=1, page_size=1000):
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        params = self.__retrieveParams(execution_id)
+        params = self.retrieveParams(execution_id)
         stats = self.__retrieveStats(execution_id)
-        data = self.__retrieveData(execution_id, trim_data=trim_data)
+        data = self.__retrieveData(execution_id, trim_data=trim_data, page_num=page_num, page_size=page_size)
         return params, stats, data
 
-    def __retrieveData(self, id, trim_data=False):
-        dataMap = self.__retrievePrimaryData(id, trim_data=trim_data)
+    def __retrieveData(self, id, trim_data=False, page_num=1, page_size=1000):
+        dataMap = self.__retrievePrimaryData(id, trim_data=trim_data, page_num=page_num, page_size=page_size)
         self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data)
         data = [dataMap[name] for name in dataMap]
         return data
@@ -284,15 +300,13 @@ class ResultsRetrieval(AbstractResultsContainer):
                 if not "matches" in dataMap[row.primary_value_id]:
                     dataMap[row.primary_value_id]["matches"] = []
                 dataMap[row.primary_value_id]["matches"].append(entry)
-            else:
-                print(row)
 
-    def __retrievePrimaryData(self, id, trim_data=False):
-        cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true"
-        rows = self._session.execute(cql, (id,))
+    def __retrievePrimaryData(self, id, trim_data=False, page_num=2, page_size=10):
+        cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true limit %s"
+        rows = self._session.execute(cql, [id, page_num * page_size])
 
         dataMap = {}
-        for row in rows:
+        for row in rows[(page_num-1)*page_size:page_num*page_size]:
             entry = self.__rowToDataEntry(row, trim_data=trim_data)
             dataMap[row.value_id] = entry
         return dataMap
@@ -343,7 +357,7 @@ class ResultsRetrieval(AbstractResultsContainer):
 
         raise Exception("Execution not found with id '%s'" % id)
 
-    def __retrieveParams(self, id):
+    def retrieveParams(self, id):
         cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
         for row in rows:
@@ -368,3 +382,23 @@ class ResultsRetrieval(AbstractResultsContainer):
             return params
 
         raise Exception("Execution not found with id '%s'" % id)
+
+    def retrieveExecution(self, execution_id):
+        """
+        Retrieve execution details from database.
+
+        :param execution_id: Execution ID
+        :return: execution status dictionary
+        """
+
+        cql = "SELECT * FROM doms_executions where id = %s limit 1"
+        rows = self._session.execute(cql, (execution_id,))
+        for row in rows:
+            return {
+                'status': row.status,
+                'message': row.message,
+                'timeCompleted': row.time_completed,
+                'timeStarted': row.time_started
+            }
+
+        raise ValueError('Execution not found with id %s', execution_id)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
index 9580d38..bc568f8 100644
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ b/analysis/webservice/algorithms/doms/__init__.py
@@ -15,6 +15,8 @@
 
 
 from . import BaseDomsHandler
+from . import ExecutionStatus
+from . import ExecutionCancel
 from . import DatasetListQuery
 from . import DomsInitialization
 from . import MatchupQuery
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index f27612a..a55f61d 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+import uuid
 from typing import Optional
 import logging
 import threading
@@ -30,20 +29,23 @@ from pytz import timezone, UTC
 from scipy import spatial
 from shapely import wkt
 from shapely.geometry import box
+import functools
 
 from webservice.NexusHandler import nexus_handler
-from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
 from webservice.algorithms.doms import config as edge_endpoints
 from webservice.algorithms.doms import values as doms_values
-from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
 from webservice.algorithms.doms.ResultsStorage import ResultsStorage
 from webservice.algorithms.doms.insitu import query_insitu as query_edge
 from webservice.algorithms.doms.insitu import query_insitu_schema
 from webservice.webmodel import NexusProcessingException
+from webservice.webmodel.NexusExecutionResults import ExecutionStatus
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 
+LARGE_JOB_THRESHOLD = 4000
+
 
 class Schema:
     def __init__(self):
@@ -66,7 +68,7 @@ def iso_time_to_epoch(str_time):
 
 
 @nexus_handler
-class Matchup(NexusCalcSparkHandler):
+class Matchup(NexusCalcSparkTornadoHandler):
     name = "Matchup"
     path = "/match_spark"
     description = "Match measurements between two or more datasets"
@@ -154,7 +156,12 @@ class Matchup(NexusCalcSparkHandler):
     singleton = True
 
     def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
-        NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
+        NexusCalcSparkTornadoHandler.__init__(
+            self,
+            algorithm_config=algorithm_config,
+            sc=sc,
+            tile_service_factory=tile_service_factory
+        )
         self.log = logging.getLogger(__name__)
         self.tile_service_factory = tile_service_factory
         self.config = config
@@ -229,42 +236,82 @@ class Matchup(NexusCalcSparkHandler):
                depth_min, depth_max, time_tolerance, radius_tolerance, \
                platforms, match_once, result_size_limit, prioritize_distance
 
-    def calc(self, request, **args):
-        start = datetime.utcnow()
-        # TODO Assuming Satellite primary
-        bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
-        start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
-        depth_min, depth_max, time_tolerance, radius_tolerance, \
-        platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
-
-        with ResultsStorage(self.config) as resultsStorage:
+    def get_job_pool(self, tile_ids):
+        if len(tile_ids) > LARGE_JOB_THRESHOLD:
+            return 'large'
+        return 'small'
 
-            execution_id = str(resultsStorage.insertExecution(None, start, None, None))
-
-        self.log.debug("Querying for tiles in search domain")
-        # Get tile ids in box
-        tile_ids = [tile.tile_id for tile in
-                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
-                                                             start_seconds_from_epoch, end_seconds_from_epoch,
-                                                             fetch_data=False, fl='id',
-                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
-                                                                   'tile_min_lat asc'], rows=5000)]
-
-        self.log.info('Found %s tile_ids', len(tile_ids))
+    def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
+                   secondary_ds_names, parameter_s, start_time, end_time, depth_min,
+                   depth_max, time_tolerance, radius_tolerance, platforms, match_once,
+                   result_size_limit, start, prioritize_distance):
         # Call spark_matchup
         self.log.debug("Calling Spark Driver")
+
+        job_priority = self.get_job_pool(tile_ids)
+
         try:
-            spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
-                                                secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
-                                                radius_tolerance, platforms, match_once, self.tile_service_factory,
-                                                prioritize_distance, sc=self._sc)
-        except Exception as e:
-            self.log.exception(e)
-            raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
+            self._sc.setJobGroup(execution_id, execution_id)
+            self._sc.setLocalProperty('spark.scheduler.pool', job_priority)
+            spark_result = spark_matchup_driver(
+                tile_ids, wkt.dumps(bounding_polygon),
+                primary_ds_name,
+                secondary_ds_names,
+                parameter_s,
+                depth_min,
+                depth_max, time_tolerance,
+                radius_tolerance,
+                platforms,
+                match_once,
+                self.tile_service_factory,
+                sc=self._sc,
+                prioritize_distance=prioritize_distance
+            )
+        except Exception as error:
+            self.log.exception(error)
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message=str(error),
+                    stats=None,
+                    results=None
+                )
+            return
 
+        self.log.debug("Building and saving results")
         end = datetime.utcnow()
 
-        self.log.debug("Building and saving results")
+        total_keys = len(list(spark_result.keys()))
+        total_values = sum(len(v) for v in spark_result.values())
+        details = {
+            "timeToComplete": int((end - start).total_seconds()),
+            "numSecondaryMatched": total_values,
+            "numPrimaryMatched": total_keys
+        }
+
+        matches = Matchup.convert_to_matches(spark_result)
+
+        with ResultsStorage(self.config) as storage:
+            storage.updateExecution(
+                uuid.UUID(execution_id),
+                completeTime=end,
+                status=ExecutionStatus.SUCCESS.value,
+                message=None,
+                stats=details,
+                results=matches
+            )
+
+    def calc(self, request, tornado_io_loop, **args):
+        start = datetime.utcnow()
+        # TODO Assuming Satellite primary
+        bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
+        start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+        depth_min, depth_max, time_tolerance, radius_tolerance, \
+        platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
+
         args = {
             "primary": primary_ds_name,
             "matchup": secondary_ds_names,
@@ -283,37 +330,63 @@ class Matchup(NexusCalcSparkHandler):
         if depth_max is not None:
             args["depthMax"] = float(depth_max)
 
-        total_keys = len(list(spark_result.keys()))
-        total_values = sum(len(v) for v in spark_result.values())
-        details = {
-            "timeToComplete": int((end - start).total_seconds()),
-            "numSecondaryMatched": total_values,
-            "numPrimaryMatched": total_keys
-        }
 
-        matches = Matchup.convert_to_matches(spark_result)
+        with ResultsStorage(self.config) as resultsStorage:
+            execution_id = str(resultsStorage.insertInitialExecution(
+                params=args,
+                startTime=start,
+                status=ExecutionStatus.RUNNING.value
+            ))
 
-        def do_result_insert():
-            with ResultsStorage(self.config) as storage:
-                storage.insertResults(results=matches, params=args, stats=details,
-                                      startTime=start, completeTime=end, userEmail="",
-                                      execution_id=execution_id)
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
 
-        threading.Thread(target=do_result_insert).start()
+        self.log.info('Found %s tile_ids', len(tile_ids))
 
-        # Get only the first "result_size_limit" results
-        # '0' means returns everything
-        if result_size_limit > 0:
-            return_matches = matches[0:result_size_limit]
-        else:
-            return_matches = matches
+        if not tile_ids:
+            # There are no matching tiles
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message='No tiles matched the provided domain',
+                    stats=None,
+                    results=None
+                )
 
-        result = DomsQueryResults(results=return_matches, args=args,
-                                  details=details, bounds=None,
-                                  count=len(matches), computeOptions=None,
-                                  executionId=execution_id)
+        # Start async processing with Spark. Do not wait for response
+        # before returning to user.
+        tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
+            self.async_calc,
+            execution_id=execution_id,
+            tile_ids=tile_ids,
+            bounding_polygon=bounding_polygon,
+            primary_ds_name=primary_ds_name,
+            secondary_ds_names=secondary_ds_names,
+            parameter_s=parameter_s,
+            start_time=start_time,
+            end_time=end_time,
+            depth_min=depth_min,
+            depth_max=depth_max,
+            time_tolerance=time_tolerance,
+            radius_tolerance=radius_tolerance,
+            platforms=platforms,
+            match_once=match_once,
+            result_size_limit=result_size_limit,
+            start=start,
+            prioritize_distance=prioritize_distance
+        ))
+
+        request.requestHandler.redirect(f'/job?id={execution_id}')
 
-        return result
 
     @classmethod
     def convert_to_matches(cls, spark_result):
diff --git a/data-access/requirements.txt b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
similarity index 79%
copy from data-access/requirements.txt
copy to analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
index 5127018..c2a38c5 100644
--- a/data-access/requirements.txt
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-cassandra-driver==3.24.0
-pysolr==3.9.0
-elasticsearch==8.3.1
-urllib3==1.26.2
-requests
-nexusproto
-Shapely
+import logging
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+
+logger = logging.getLogger(__name__)
+
+
+class NexusCalcSparkTornadoHandler(NexusCalcSparkHandler):
+    pass
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index e691f86..ea9b16b 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -535,6 +535,22 @@ paths:
             type: string
             enum: ['CSV', 'NETCDF', 'JSON']
           example: CSV
+        - in: query
+          name: pageNum
+          description: |
+            Page of results to retrieve.
+          required: false
+          schema:
+            type: integer
+          example: 1
+        - in: query
+          name: pageSize
+          description: |
+            Number of items to return in each page.
+          required: false
+          schema:
+            type: integer
+          example: 100
         - in: query
           name: filename
           description: |
@@ -669,6 +685,42 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/Error'
+  /job:
+    get:
+      summary: |
+        Get job status
+      operationId: job
+      tags:
+        - Analytics
+      description: "Get job status by execution id"
+      parameters:
+        - in: query
+          name: id
+          description: |
+            The job execution ID
+          required: true
+          schema:
+            type: string
+            format: uuid
+          example: c864a51b-3d87-4872-9070-632820b1cae2
+  /job/cancel:
+    get:
+      summary: |
+        Cancel running job
+      operationId: jobCancel
+      tags:
+        - Analytics
+      description: "Cancel running job"
+      parameters:
+        - in: query
+          name: id
+          description: |
+            The job execution ID
+          required: true
+          schema:
+            type: string
+            format: uuid
+          example: c864a51b-3d87-4872-9070-632820b1cae2
 externalDocs:
   description: Documentation
   url: https://incubator-sdap-nexus.readthedocs.io/en/latest/index.html
diff --git a/analysis/webservice/config/scheduler.xml b/analysis/webservice/config/scheduler.xml
new file mode 100644
index 0000000..3016017
--- /dev/null
+++ b/analysis/webservice/config/scheduler.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0"?>
+<allocations>
+  <pool name="small">
+    <weight>1000</weight>
+    <minShare>1</minShare>
+  </pool>
+  <pool name="large">
+    <weight>1</weight>
+  </pool>
+</allocations>
\ No newline at end of file
diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
index fed08b9..2a84ae7 100644
--- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
@@ -37,6 +37,7 @@ class HandlerArgsBuilder:
                 class_wrapper == webservice.algorithms_spark.Matchup.Matchup
                 or class_wrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms
                 or issubclass(class_wrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler)
+                or issubclass(class_wrapper, webservice.algorithms_spark.NexusCalcSparkTornadoHandler.NexusCalcSparkTornadoHandler)
                 or class_wrapper == webservice.algorithms.doms.ResultsRetrieval.DomsResultsRetrievalHandler
                 or class_wrapper == webservice.algorithms.doms.ResultsPlotQuery.DomsResultsPlotHandler
         )
diff --git a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
index ee3fd2f..5daf279 100644
--- a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import pkg_resources
 
 
 class SparkContextBuilder:
@@ -25,7 +26,13 @@ class SparkContextBuilder:
         if cls.spark_context is None:
             from pyspark.sql import SparkSession
 
-            spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
+            scheduler_path = pkg_resources.resource_filename('webservice', "config/scheduler.xml")
+
+            spark = SparkSession.builder.appName("nexus-analysis").config(
+                "spark.scheduler.allocation.file", scheduler_path
+            ).config(
+                "spark.scheduler.mode", "FAIR"
+            ).getOrCreate()
             cls.spark_context = spark.sparkContext
 
         return cls.spark_context
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 2645574..95bddf4 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -20,6 +20,7 @@ import tornado.ioloop
 
 from webservice.nexus_tornado.request.renderers import NexusRendererFactory
 from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException
+from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
 
 
 class NexusRequestHandler(tornado.web.RequestHandler):
@@ -44,18 +45,29 @@ class NexusRequestHandler(tornado.web.RequestHandler):
         # create NexusCalcHandler which will process the request
         instance = self.__clazz(**self._clazz_init_args)
 
+        io_loop = tornado.ioloop.IOLoop.current()
+
         try:
-            # process the request asynchronously on a different thread,
-            # the current tornado handler is still available to get other user requests
-            results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+            if isinstance(instance, NexusCalcSparkTornadoHandler):
+                results = instance.calc(request, io_loop)
+            else:
+                results = yield io_loop.run_in_executor(
+                    self.executor,
+                    instance.calc,
+                    request
+                )
 
             try:
                 self.set_status(results.status_code)
             except AttributeError:
                 pass
 
-            renderer = NexusRendererFactory.get_renderer(request)
-            renderer.render(self, results)
+            # Only render results if there are results to render.
+            # "NexusCalcSparkTornadoHandler" endpoints redirectm so no
+            # need to render.
+            if not isinstance(instance, NexusCalcSparkTornadoHandler):
+                renderer = NexusRendererFactory.get_renderer(request)
+                renderer.render(self, results)
 
         except NexusProcessingException as e:
             self.async_onerror_callback(e.reason, e.code)
diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py
new file mode 100644
index 0000000..d5c1204
--- /dev/null
+++ b/analysis/webservice/webmodel/NexusExecutionResults.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+    RUNNING = 'running'
+    SUCCESS = 'success'
+    FAILED = 'failed'
+    CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+    return {
+        'status': job_state.value,
+        'message': message,
+        'createdAt': created,
+        'updatedAt': updated,
+        'links': [{
+            'href': f'{host}/job?id={execution_id}',
+            'title': 'The current page',
+            'type': 'application/json',
+            'rel': 'self'
+        }],
+        'params': params,
+        'jobID': execution_id
+    }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+    # Construct urls
+    formats = [
+        ('CSV', 'text/csv'),
+        ('JSON', 'application/json'),
+        ('NETCDF', 'binary/octet-stream')
+    ]
+    data_links = [{
+        'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+        'title': 'Download results',
+        'type': mime,
+        'rel': 'data'
+    } for output_format, mime in formats]
+    job_body['links'].extend(data_links)
+    return job_body
+
+
+def construct_running(status, created, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        None,
+        execution_id,
+        params,
+        host
+    )
+    job_body['links'].append({
+        'href': f'{host}/job/cancel?id={execution_id}',
+        'title': 'Cancel the job',
+        'rel': 'cancel'
+    })
+    return job_body
+
+
+def construct_error(status, created, completed, execution_id, message, params, host):
+    return construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host,
+        message
+    )
+
+
+def construct_cancelled(status, created, completed, execution_id, params, host):
+    return construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+
+class NexusExecutionResults:
+    def __init__(self, status=None, created=None, completed=None, execution_id=None, message='',
+                 params=None, host=None, status_code=200):
+        self.status_code = status_code
+        self.status = status
+        self.created = created
+        self.completed = completed
+        self.execution_id = execution_id
+        self.message = message
+        self.execution_params = params
+        self.host = host
+
+    def toJson(self):
+        params = {
+            'status': self.status,
+            'created': self.created,
+            'execution_id': self.execution_id,
+            'params': self.execution_params,
+            'host': self.host
+        }
+        if self.status == ExecutionStatus.SUCCESS:
+            params['completed'] = self.completed
+            construct = construct_done
+        elif self.status == ExecutionStatus.RUNNING:
+            construct = construct_running
+        elif self.status == ExecutionStatus.FAILED:
+            params['completed'] = self.completed
+            params['message'] = self.message
+            construct = construct_error
+        elif self.status == ExecutionStatus.CANCELLED:
+            params['completed'] = self.completed
+            construct = construct_cancelled
+        else:
+            # Raise error -- job state is invalid
+            raise ValueError('Unable to fetch status for execution {}', self.execution_id)
+
+        job_details = construct(**params)
+        return json.dumps(job_details, indent=4, default=str)
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index 5127018..f91f180 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+numpy==1.24.3
 cassandra-driver==3.24.0
 pysolr==3.9.0
 elasticsearch==8.3.1
@@ -20,3 +21,4 @@ urllib3==1.26.2
 requests
 nexusproto
 Shapely
+numpy==1.24.3