You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/05/17 00:13:13 UTC

[incubator-sdap-nexus] branch master updated: SDAP-453 - Update results storage & retrieval so results JSON output matches original matchup output (#239)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new d0c48eb  SDAP-453 - Update results storage & retrieval so results JSON output matches original matchup output (#239)
d0c48eb is described below

commit d0c48eb62405672fa3c2435daa7a1142142a2f60
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Tue May 16 17:13:07 2023 -0700

    SDAP-453 - Update results storage & retrieval so results JSON output matches original matchup output (#239)
    
    * SDAP-453: Updated storage/retrieval to store/fetch complete matchup data
    
    Checked for /cdmsresults output types JSON and CSV
    Not yet working for NETCDF
    
    * SDAP-453: Script to convert old result table to new one
    
    * SDAP-453: NETCDF support
    
    * Made output parameter case-insensitive
    
    * Changelog update
    
    * SDAP-453: Format CSV output
    
    Correctly write variable data instead of dumping JSON
    
    * SDAP-453: NetCDF formatting
    
    * SDAP-453: Bit of backwards compatability with the old db schema
    
    I think this will only support the JSON renderer
    
    * Update conversion script to handle errors
    
    * Update script correctly assigns to cf_variable_name
    
    * NetCDF
    
    * Matchup insitu id field
    
    1) Ensure it is never null or empty string
    2) Append the depth to the id in order to avoid filtering out measurements at different depths at the same lat+lon+time during NetCDF rendering
    
    * Removed traceback logging statement
    
    * Added cl args to update script
    
    * Added more details to changelog
    
    * Updated apidocs to add JSON as output option
    
    * Typo in changelog entry
    
    ---------
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 CHANGELOG.md                                       |   6 +
 .../webservice/algorithms/doms/BaseDomsHandler.py  | 105 +++++++---
 .../algorithms/doms/DomsInitialization.py          |   3 +-
 .../webservice/algorithms/doms/ResultsRetrieval.py |   2 +-
 .../webservice/algorithms/doms/ResultsStorage.py   | 191 +++++++++--------
 analysis/webservice/algorithms_spark/Matchup.py    |  11 +-
 analysis/webservice/apidocs/openapi.yml            |   2 +-
 .../request/renderers/NexusRendererFactory.py      |   2 +-
 tools/update-doms-data-schema/update.py            | 226 +++++++++++++++++++++
 9 files changed, 433 insertions(+), 115 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4b3a3b6..dfbea59 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Added
 - SDAP-461: Added 4 remaining Saildrone insitu datasets.
 ### Changed
+- SDAP-453: Updated results storage and retrieval to support output JSON from `/cdmsresults` that matches output from `/match_spark`.
+  - **NOTE:** Deploying these changes to an existing SDAP deployment will require modifying the Cassandra database with stored results. There is a script to do so at `/tools/update-doms-data-schema/update.py`
+  - Additional changes:
+    - Made the `output` parameter case-insensitive
+    - Improved speed of results insert
+    - Updated `id` field of insitu points to include depth. This solves an issue with NetCDF result rendering where different insitu observations at the same place & time but at different depths were being excluded for having the same `id`.
 ### Deprecated
 ### Removed
 ### Fixed
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index 904732b..dad1605 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -125,16 +125,33 @@ class DomsCSVFormatter:
 
         return csv_out
 
+    @staticmethod
+    def __get_variable_name(variable):
+        def is_empty(s):
+            return s is None or s == ''
+
+        name = variable['cf_variable_name']
+
+        return name if not is_empty(name) else variable['variable_name']
+
     @staticmethod
     def __packValues(csv_mem_file, results):
         primary_headers = list(dict.fromkeys(
-            key for result in results for key in result if key != 'matches'
+            key for result in results for key in result if key not in ['matches', 'primary']
         ))
 
+        primary_headers.extend(list(dict.fromkeys(
+            DomsCSVFormatter.__get_variable_name(variable) for result in results for variable in result['primary']
+        )))
+
         secondary_headers = list(dict.fromkeys(
-            key for result in results for match in result['matches'] for key in match
+            key for result in results for match in result['matches'] for key in match if key != 'secondary'
         ))
 
+        secondary_headers.extend(list(dict.fromkeys(
+            DomsCSVFormatter.__get_variable_name(variable) for result in results for match in result['matches'] for variable in match['secondary']
+        )))
+
         writer = csv.writer(csv_mem_file)
         writer.writerow(list(itertools.chain(primary_headers, secondary_headers)))
 
@@ -145,13 +162,24 @@ class DomsCSVFormatter:
                 for key, value in primaryValue.items():
                     if key == 'matches':
                         continue
-                    index = primary_headers.index(key)
-                    primary_row[index] = value
+
+                    if key != 'primary':
+                        index = primary_headers.index(key)
+                        primary_row[index] = value
+                    else:
+                        for variable in value:
+                            index = primary_headers.index(DomsCSVFormatter.__get_variable_name(variable))
+                            primary_row[index] = variable['variable_value']
                 # Secondary
                 secondary_row = [None for _ in range(len(secondary_headers))]
                 for key, value in matchup.items():
-                    index = secondary_headers.index(key)
-                    secondary_row[index] = value
+                    if key != 'secondary':
+                        index = secondary_headers.index(key)
+                        secondary_row[index] = value
+                    else:
+                        for variable in value:
+                            index = secondary_headers.index(DomsCSVFormatter.__get_variable_name(variable))
+                            secondary_row[index] = variable['variable_value']
                 writer.writerow(list(itertools.chain(primary_row, secondary_row)))
 
     @staticmethod
@@ -241,8 +269,8 @@ class DomsCSVFormatter:
 
             {"Global Attribute": "CDMS_time_to_complete", "Value": details["timeToComplete"]},
             {"Global Attribute": "CDMS_time_to_complete_units", "Value": "seconds"},
-            {"Global Attribute": "CDMS_num_secondary_matched", "Value": details["numInSituMatched"]},
-            {"Global Attribute": "CDMS_num_primary_matched", "Value": details["numGriddedMatched"]},
+            {"Global Attribute": "CDMS_num_secondary_matched", "Value": details["numSecondaryMatched"]},
+            {"Global Attribute": "CDMS_num_primary_matched", "Value": details["numPrimaryMatched"]},
 
             {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
             {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
@@ -272,8 +300,8 @@ class DomsNetCDFFormatter:
         dataset.time_coverage_end = params["endTime"].strftime(ISO_8601)
         dataset.time_coverage_resolution = "point"
         dataset.CDMS_secondary = params["matchup"]
-        dataset.CDMS_num_matchup_matched = details["numInSituMatched"]
-        dataset.CDMS_num_primary_matched = details["numGriddedMatched"]
+        dataset.CDMS_num_matchup_matched = details["numSecondaryMatched"]
+        dataset.CDMS_num_primary_matched = details["numPrimaryMatched"]
 
         bbox = geo.BoundingBox(asString=params["bbox"])
         dataset.geospatial_lat_max = bbox.north
@@ -377,13 +405,15 @@ class DomsNetCDFFormatter:
 
             # Add each match only if it is not already in the array of in situ points
             for match in result["matches"]:
-                if match["id"] not in ids:
-                    ids[match["id"]] = insituIndex
+                key = (match['id'], f'{match["depth"]:.4}')
+
+                if key not in ids:
+                    ids[key] = insituIndex
                     insituIndex += 1
                     insituWriter.addData(match)
 
                 # Append an index pait of (satellite, in situ) to the array of matches
-                matches.append((r, ids[match["id"]]))
+                matches.append((r, ids[key]))
 
         # Add data/write to the netCDF file
         satelliteWriter.writeGroup()
@@ -413,7 +443,8 @@ class DomsNetCDFValueWriter:
         non_data_fields = [
             'id', 'lon', 'lat',
             'source', 'device',
-            'platform', 'time', 'matches'
+            'platform', 'time', 'matches',
+            'point', 'fileurl'
         ]
         self.lat.append(result_item.get('lat', None))
         self.lon.append(result_item.get('lon', None))
@@ -461,16 +492,38 @@ class DomsNetCDFValueWriter:
             depthVar[:] = self.depth
 
         for variable_name, data in self.data_map.items():
-            # Create a variable for each data point
-            data_variable = self.group.createVariable(variable_name, 'f4', ('dim',), fill_value=-32767.0)
-            # Find min/max for data variables. It is possible for 'None' to
-            # be in this list, so filter those out when doing the calculation.
-            min_data = min(val for val in data if val is not None)
-            max_data = max(val for val in data if val is not None)
-            self.__enrichVariable(data_variable, min_data, max_data, has_depth=self.depth)
-            data_variable[:] = data
-            data_variable.long_name = variable_name
-            data_variable.standard_name = variable_name
+            units = {}
+
+            variables = dict.fromkeys(
+                ((variable['variable_name'], variable['cf_variable_name']) for match in data for variable in match),
+                None
+            )
+
+            for variable in variables:
+                variables[variable] = np.repeat(np.nan, len(data))
+
+            for i, match in enumerate(data):
+                for variable in match:
+                    key = (variable['variable_name'], variable['cf_variable_name'])
+                    unit = variable['variable_unit']
+                    units[key] = str(unit) if unit is not None else 'UNKNOWN'
+                    variables[key][i] = variable['variable_value']
+
+            for variable in variables:
+                # Create a variable for each data point
+                name = variable[0]
+                cf_name = variable[1]
+
+                data_variable = self.group.createVariable(
+                    cf_name if cf_name is not None and cf_name != '' else name, 'f4', ('dim',), fill_value=-32767.0)
+                # Find min/max for data variables. It is possible for 'None' to
+                # be in this list, so filter those out when doing the calculation.
+                min_data = np.nanmin(variables[variable])
+                max_data = np.nanmax(variables[variable])
+                self.__enrichVariable(data_variable, min_data, max_data, has_depth=None, unit=units[variable])
+                data_variable[:] = np.ma.masked_invalid(variables[variable])
+                data_variable.long_name = name
+                data_variable.standard_name = cf_name
 
     #
     # Lists may include 'None" values, to calc min these must be filtered out
@@ -480,13 +533,13 @@ class DomsNetCDFValueWriter:
         return min(x for x in var if x is not None)
 
     @staticmethod
-    def __enrichVariable(var, var_min, var_max, has_depth):
+    def __enrichVariable(var, var_min, var_max, has_depth, unit='UNKNOWN'):
         coordinates = ['lat', 'lon', 'depth', 'time']
 
         if not has_depth:
             coordinates = ['lat', 'lon', 'time']
 
-        var.units = 'UNKNOWN'  # TODO populate this field once this metadata is in place
+        var.units = unit
         var.valid_min = var_min
         var.valid_max = var_max
         var.coordinates = ' '.join(coordinates)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 591c5f6..b695c9e 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -163,8 +163,9 @@ class DomsInitializer:
               measurement_time timestamp,
               platform text,
               device text,
-              measurement_values map<text, decimal>,
+              measurement_values_json text,
               depth decimal,
+              file_url text,
               PRIMARY KEY (execution_id, is_primary, id)
             );
         """
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
index 4007054..c3b95b0 100644
--- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py
+++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
@@ -46,5 +46,5 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
         with ResultsStorage.ResultsRetrieval(self.config) as storage:
             params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
 
-        return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None,
+        return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=len(data),
                                                 computeOptions=None, executionId=execution_id)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index c989286..d0ead7e 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -16,6 +16,7 @@
 import configparser
 import json
 import logging
+from time import sleep
 import math
 import uuid
 from datetime import datetime
@@ -28,6 +29,13 @@ from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
 from cassandra.query import BatchStatement
 from pytz import UTC
 from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
+from webservice.webmodel import NexusProcessingException
+
+BATCH_SIZE = 1024
+
+
+class ResultInsertException(IOError):
+    pass
 
 
 class AbstractResultsContainer:
@@ -159,85 +167,92 @@ class ResultsStorage(AbstractResultsContainer):
 
         cql = """
            INSERT INTO doms_data
-                (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary, depth)
+                (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url)
            VALUES
-                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
         """
         insertStatement = self._session.prepare(cql)
-        batch = BatchStatement()
+
+        inserts = []
 
         for result in results:
-            self.__insertResult(execution_id, None, result, batch, insertStatement)
+            inserts.extend(self.__prepare_result(execution_id, None, result, insertStatement))
+
+        for i in range(5):
+            if not self.__insert_result_batches(inserts, insertStatement):
+                if i < 4:
+                    self._log.warning('Some write attempts failed; retrying')
+                    sleep(10)
+                else:
+                    self._log.error('Some write attempts failed; max retries exceeded')
+                    raise ResultInsertException('Some result inserts failed')
+            else:
+                break
+
+
+    def __insert_result_batches(self, insert_params, insertStatement):
+        query_batches = [insert_params[i:i + BATCH_SIZE] for i in range(0, len(insert_params), BATCH_SIZE)]
+        move_successful = True
+
+        n_inserts = len(insert_params)
+        writing = 0
+
+        self._log.info(f'Inserting {n_inserts} matchup entries in JSON format')
+
+        for batch in query_batches:
+            futures = []
+            writing += len(batch)
+            self._log.info(
+                f'Writing batch of {len(batch)} matchup entries | ({writing}/{n_inserts}) [{writing / n_inserts * 100:7.3f}%]')
 
-        self._session.execute(batch)
+            for entry in batch:
+                futures.append(self._session.execute_async(insertStatement, entry))
 
-    def __insertResult(self, execution_id, primaryId, result, batch, insertStatement):
-        data_dict = {}
+            for future in futures:
+                try:
+                    future.result()
+                except Exception:
+                    move_successful = False
+
+        self._log.info('Result data write attempt completed')
+        return move_successful
+
+    def __prepare_result(self, execution_id, primaryId, result, insertStatement):
         if 'primary' in result:
-            data_dict = result['primary']
+            data = result['primary']
         elif 'secondary' in result:
-            data_dict = result['secondary']
+            data = result['secondary']
+        else:
+            data = []
 
-        dataMap = self.__buildDataMap(data_dict)
         result_id = uuid.uuid4()
 
         insert_params = (
-                result_id,
-                execution_id,
-                result["id"],
-                primaryId,
-                result["lon"],
-                result["lat"],
-                result["source"],
-                result["time"],
-                result["platform"] if "platform" in result else None,
-                result["device"] if "device" in result else None,
-                dataMap,
-                1 if primaryId is None else 0,
-                result["depth"]
-            )
-
-        try:
-            batch.add(insertStatement, insert_params)
-        except:
-            self._log.error(f'Result batch INSERT preparation failed')
-            self._log.error('INSERT params %s', str(insert_params))
-            raise
+            result_id,
+            execution_id,
+            result["id"],
+            primaryId,
+            result["lon"],
+            result["lat"],
+            result["source"],
+            result["time"],
+            result["platform"] if "platform" in result else None,
+            result["device"] if "device" in result else None,
+            json.dumps(data, cls=DomsEncoder),
+            1 if primaryId is None else 0,
+            result["depth"],
+            result['fileurl']
+        )
+
+        params_list = [insert_params]
 
-        n = 0
         if "matches" in result:
             for match in result["matches"]:
-                self.__insertResult(execution_id, result["id"], match, batch, insertStatement)
-                n += 1
-                if n >= 20:
-                    if primaryId is None:
-                        self.__commitBatch(batch)
-                    n = 0
-
-        if primaryId is None:
-            self.__commitBatch(batch)
-
-    def __commitBatch(self, batch):
-        self._session.execute(batch)
-        batch.clear()
-
-    def __buildDataMap(self, result):
-        dataMap = {}
-        for data_dict in result:
-            name = data_dict.get('cf_variable_name')
-
-            if name is None:
-                name = data_dict['variable_name']
+                params_list.extend(self.__prepare_result(execution_id, result["id"], match, insertStatement))
 
-            value = data_dict['variable_value']
-            if isinstance(value, np.generic):
-                value = value.item()
+        return params_list
 
-            if math.isnan(value):
-                value = None
 
-            dataMap[name] = value
-        return dataMap
 
 
 class ResultsRetrieval(AbstractResultsContainer):
@@ -292,30 +307,37 @@ class ResultsRetrieval(AbstractResultsContainer):
             }
         else:
             entry = {
-                "id": row.value_id,
-                "lon": float(row.x),
-                "lat": float(row.y),
-                "source": row.source_dataset,
-                "device": row.device,
                 "platform": row.platform,
+                "device": row.device,
+                "lon": str(row.x),
+                "lat": str(row.y),
+                "point": f"Point({float(row.x):.3f} {float(row.y):.3f})",
                 "time": row.measurement_time.replace(tzinfo=UTC),
-                "depth": row.depth
+                "depth": float(row.depth) if row.depth is not None else None,
+                "fileurl": row.file_url if hasattr(row, 'file_url') else None,
+                "id": row.value_id,
+                "source": row.source_dataset,
             }
-        for key in row.measurement_values:
-            value = float(row.measurement_values[key])
-            entry[key] = value
+
+        # If doms_data uses the old schema, default to original behavior
+
+        try:
+            entry['primary' if row.is_primary else 'secondary'] = json.loads(row.measurement_values_json)
+        except AttributeError:
+            for key in row.measurement_values:
+                value = float(row.measurement_values[key])
+                entry[key] = value
+
         return entry
 
     def __retrieveStats(self, id):
-        cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1"
+        cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete FROM doms_execution_stats where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
         for row in rows:
             stats = {
-                "numGriddedMatched": row.num_gridded_matched,
-                "numGriddedChecked": row.num_gridded_checked,
-                "numInSituMatched": row.num_insitu_matched,
-                "numInSituChecked": row.num_insitu_checked,
-                "timeToComplete": row.time_to_complete
+                "timeToComplete": row.time_to_complete,
+                "numSecondaryMatched": row.num_insitu_matched,
+                "numPrimaryMatched": row.num_gridded_matched,
             }
             return stats
 
@@ -325,18 +347,23 @@ class ResultsRetrieval(AbstractResultsContainer):
         cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
         rows = self._session.execute(cql, (id,))
         for row in rows:
+            matchup = row.matchup_datasets.split(",")
+
+            if len(matchup) == 1:
+                matchup = matchup[0]
+
             params = {
                 "primary": row.primary_dataset,
-                "matchup": row.matchup_datasets.split(","),
-                "depthMin": row.depth_min,
-                "depthMax": row.depth_max,
-                "timeTolerance": row.time_tolerance,
-                "radiusTolerance": row.radius_tolerance,
+                "matchup": matchup,
                 "startTime": row.start_time.replace(tzinfo=UTC),
                 "endTime": row.end_time.replace(tzinfo=UTC),
-                "platforms": row.platforms,
                 "bbox": row.bounding_box,
-                "parameter": row.parameter
+                "timeTolerance": int(row.time_tolerance) if row.time_tolerance is not None else None,
+                "radiusTolerance": float(row.radius_tolerance) if row.radius_tolerance is not None else None,
+                "platforms": row.platforms,
+                "parameter": row.parameter,
+                "depthMin": float(row.depth_min) if row.depth_min is not None else None,
+                "depthMax": float(row.depth_max) if row.depth_max is not None else None,
             }
             return params
 
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 6cd4676..2099ade 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -563,10 +563,15 @@ class DomsPoint(object):
             ) if var_value])
         point.data = data
 
-        if 'meta' in edge_point:
-            point.data_id = edge_point['meta']
+        meta = edge_point.get('meta', None)
+
+        # Appending depth to data_id. Currently, our insitu data has the same id value for measurements taken at
+        # different depths. This causes secondary insitu matches to be incorrectly filtered out from NetCDF files.
+
+        if meta:
+            point.data_id = f'{meta}@{point.depth}'
         else:
-            point.data_id = f'{point.time}:{point.longitude}:{point.latitude}'
+            point.data_id = f'{point.time}:{point.longitude}:{point.latitude}@{point.depth}'
 
         return point
 
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index a596fc3..b8250ee 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -533,7 +533,7 @@ paths:
           required: true
           schema:
             type: string
-            enum: ['CSV', 'NETCDF']
+            enum: ['CSV', 'NETCDF', 'JSON']
           example: CSV
       responses:
         '200':
diff --git a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py
index 273bd39..e0dabe2 100644
--- a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py
+++ b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py
@@ -19,7 +19,7 @@ class NexusRendererFactory(object):
 
     @classmethod
     def get_renderer(cls, request):
-        content_type = request.get_content_type()
+        content_type = request.get_content_type().upper()
         if content_type in cls.content_types:
             renderer_name = 'Nexus' + content_type + 'Renderer'
             renderer = getattr(cls.module.nexus_tornado.request.renderers, renderer_name)
diff --git a/tools/update-doms-data-schema/update.py b/tools/update-doms-data-schema/update.py
new file mode 100644
index 0000000..1f08b12
--- /dev/null
+++ b/tools/update-doms-data-schema/update.py
@@ -0,0 +1,226 @@
+import argparse
+import configparser
+import decimal
+import json
+import logging
+
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
+from cassandra.policies import (DCAwareRoundRobinPolicy, TokenAwarePolicy,
+                                WhiteListRoundRobinPolicy)
+
+try:
+    logging.getLogger('webservice.NexusHandler').setLevel(logging.CRITICAL)
+except:
+    pass
+
+from webservice.algorithms.doms.DomsInitialization import DomsInitializer
+
+BATCH_SIZE = 1024
+log = logging.getLogger(__name__)
+
+
+class Encoder(json.JSONEncoder):
+    def __init__(self, **args):
+        json.JSONEncoder.__init__(self, **args)
+
+    def default(self, obj):
+        if isinstance(obj, decimal.Decimal):
+            return float(obj)
+        else:
+            return json.JSONEncoder.default(self, obj)
+
+
+def main():
+    domsconfig = configparser.ConfigParser()
+    domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini'))
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument('-u', '--cassandra-username',
+                        dest='username', help='The username used to connect to Cassandra.',
+                        required=True, metavar='USERNAME')
+
+    parser.add_argument('-p', '--cassandra-password',
+                        dest='password', help='The password used to connect to Cassandra.',
+                        required=True, metavar='PASSWORD')
+
+    parser.add_argument('--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).',
+                        required=False,
+                        default=domsconfig.get("cassandra", "host"),
+                        dest='hosts',
+                        nargs='+',
+                        metavar=('localhost', '127.0.0.101'))
+
+    parser.add_argument('--cassandraPort',
+                        help='The port used to connect to Cassandra.',
+                        dest='port',
+                        required=False,
+                        default=domsconfig.get("cassandra", "port"))
+
+    args = parser.parse_args()
+
+    cassHost = args.hosts
+    cassPort = args.port
+    cassUsername = args.username
+    cassPassword = args.password
+    cassKeyspace = domsconfig.get("cassandra", "keyspace")
+    cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
+    cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+    cassPolicy = domsconfig.get("cassandra", "dc_policy")
+
+    log.info("Cassandra Host(s): %s" % (cassHost))
+    log.info("Cassandra Keyspace: %s" % (cassKeyspace))
+    log.info("Cassandra Datacenter: %s" % (cassDatacenter))
+    log.info("Cassandra Protocol Version: %s" % (cassVersion))
+    log.info("Cassandra DC Policy: %s" % (cassPolicy))
+    log.info("Cassandra Auth: %s : %s" % (cassUsername, cassPassword))
+
+    if cassPolicy == 'DCAwareRoundRobinPolicy':
+        dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+        token_policy = TokenAwarePolicy(dc_policy)
+    elif cassPolicy == 'WhiteListRoundRobinPolicy':
+        token_policy = WhiteListRoundRobinPolicy([cassHost])
+    else:
+        raise ValueError(cassPolicy)
+
+    if cassUsername and cassPassword:
+        auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
+    else:
+        auth_provider = None
+
+    try:
+        with Cluster([host for host in cassHost.split(',')],
+                     port=int(cassPort),
+                     execution_profiles={
+                         EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy)
+                     },
+                     protocol_version=cassVersion,
+                     auth_provider=auth_provider) as cluster:
+            session = cluster.connect(cassKeyspace)
+
+            cql = """
+            alter table doms_data
+                 add measurement_values_json text;
+            """
+
+            log.info('Creating measurement_values_json column')
+
+            try:
+                session.execute(cql)
+            except:
+                log.warning('measurement_values_json column creation failed; perhaps it already exists')
+
+            cql = """
+            alter table doms_data
+                 add file_url text;
+            """
+
+            log.info('Creating file_url column')
+
+            try:
+                session.execute(cql)
+            except:
+                log.warning('file_url column creation failed; perhaps it already exists')
+
+            for i in range(5):
+                if not move_data(session):
+                    if i < 4:
+                        log.warning('Some move attempts failed; retrying')
+                    else:
+                        log.critical('Some move attempts failed; max retries exceeded')
+                        exit(1)
+                else:
+                    break
+
+            cql = """
+            alter table doms_data
+                drop measurement_values;
+            """
+
+            log.info('Dropping old measurement_values column')
+            session.execute(cql)
+    except NoHostAvailable as e:
+        log.error("Unable to connect to Cassandra, Nexus will not be able to access local data ", e)
+    except Exception as e:
+        log.critical('An uncaught exception occurred')
+        log.exception(e)
+        exit(2)
+
+
+def move_data(session):
+    cql = """
+        SELECT execution_id, is_primary, id, measurement_values FROM doms_data;
+    """
+
+    log.info('Fetching execution measurements')
+
+    try:
+        rows = session.execute(cql)
+    except:
+        log.warning('SELECT query failed; the measurement_values column may no longer exist')
+        exit(0)
+
+    update_params = []
+
+    for row in rows:
+        if row.measurement_values is not None:
+            update_params.append((
+                json.dumps(translate_values(dict(row.measurement_values)), indent=4, cls=Encoder),  # values
+                row.execution_id,  # execution_id
+                row.is_primary,  # is_primary
+                row.id,  # id
+            ))
+
+    update_cql = """
+        UPDATE doms_data
+        SET measurement_values=null,
+        measurement_values_json=?
+        WHERE execution_id=? AND 
+        is_primary=? AND
+        id=?;
+    """
+
+    update_query = session.prepare(update_cql)
+    query_batches = [update_params[i:i + BATCH_SIZE] for i in range(0, len(update_params), BATCH_SIZE)]
+    move_successful = True
+
+    n_entries = len(update_params)
+    writing = 0
+
+    log.info(f'Writing {n_entries} entries in JSON format')
+
+    for batch in query_batches:
+        futures = []
+        writing += len(batch)
+        log.info(f'Writing batch of {len(batch)} entries | ({writing}/{n_entries}) [{writing/n_entries*100:7.3f}%]')
+
+        for entry in batch:
+            futures.append(session.execute_async(update_query, entry))
+
+        for future in futures:
+            try:
+                future.result()
+            except Exception:
+                move_successful = False
+
+    log.info('Move attempt completed')
+    return move_successful
+
+
+def translate_values(values_dict):
+    values = []
+
+    for key in values_dict:
+        values.append({
+            "variable_name": "",
+            "cf_variable_name": key,
+            "variable_value": values_dict[key],
+            "variable_unit": None
+        })
+
+    return values
+
+
+if __name__ == '__main__':
+    main()