You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/05/17 00:13:13 UTC
[incubator-sdap-nexus] branch master updated: SDAP-453 - Update results storage & retrieval so results JSON output matches original matchup output (#239)
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new d0c48eb SDAP-453 - Update results storage & retrieval so results JSON output matches original matchup output (#239)
d0c48eb is described below
commit d0c48eb62405672fa3c2435daa7a1142142a2f60
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Tue May 16 17:13:07 2023 -0700
SDAP-453 - Update results storage & retrieval so results JSON output matches original matchup output (#239)
* SDAP-453: Updated storage/retrieval to store/fetch complete matchup data
Checked for /cdmsresults output types JSON and CSV
Not yet working for NETCDF
* SDAP-453: Script to convert old result table to new one
* SDAP-453: NETCDF support
* Made output parameter case-insensitive
* Changelog update
* SDAP-453: Format CSV output
Correctly write variable data instead of dumping JSON
* SDAP-453: NetCDF formatting
* SDAP-453: Bit of backwards compatability with the old db schema
I think this will only support the JSON renderer
* Update conversion script to handle errors
* Update script correctly assigns to cf_variable_name
* NetCDF
* Matchup insitu id field
1) Ensure it is never null or empty string
2) Append the depth to the id in order to avoid filtering out measurements at different depths at the same lat+lon+time during NetCDF rendering
* Removed traceback logging statement
* Added cl args to update script
* Added more details to changelog
* Updated apidocs to add JSON as output option
* Typo in changelog entry
---------
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
CHANGELOG.md | 6 +
.../webservice/algorithms/doms/BaseDomsHandler.py | 105 +++++++---
.../algorithms/doms/DomsInitialization.py | 3 +-
.../webservice/algorithms/doms/ResultsRetrieval.py | 2 +-
.../webservice/algorithms/doms/ResultsStorage.py | 191 +++++++++--------
analysis/webservice/algorithms_spark/Matchup.py | 11 +-
analysis/webservice/apidocs/openapi.yml | 2 +-
.../request/renderers/NexusRendererFactory.py | 2 +-
tools/update-doms-data-schema/update.py | 226 +++++++++++++++++++++
9 files changed, 433 insertions(+), 115 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4b3a3b6..dfbea59 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- SDAP-461: Added 4 remaining Saildrone insitu datasets.
### Changed
+- SDAP-453: Updated results storage and retrieval to support output JSON from `/cdmsresults` that matches output from `/match_spark`.
+ - **NOTE:** Deploying these changes to an existing SDAP deployment will require modifying the Cassandra database with stored results. There is a script to do so at `/tools/update-doms-data-schema/update.py`
+ - Additional changes:
+ - Made the `output` parameter case-insensitive
+ - Improved speed of results insert
+ - Updated `id` field of insitu points to include depth. This solves an issue with NetCDF result rendering where different insitu observations at the same place & time but at different depths were being excluded for having the same `id`.
### Deprecated
### Removed
### Fixed
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index 904732b..dad1605 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -125,16 +125,33 @@ class DomsCSVFormatter:
return csv_out
+ @staticmethod
+ def __get_variable_name(variable):
+ def is_empty(s):
+ return s is None or s == ''
+
+ name = variable['cf_variable_name']
+
+ return name if not is_empty(name) else variable['variable_name']
+
@staticmethod
def __packValues(csv_mem_file, results):
primary_headers = list(dict.fromkeys(
- key for result in results for key in result if key != 'matches'
+ key for result in results for key in result if key not in ['matches', 'primary']
))
+ primary_headers.extend(list(dict.fromkeys(
+ DomsCSVFormatter.__get_variable_name(variable) for result in results for variable in result['primary']
+ )))
+
secondary_headers = list(dict.fromkeys(
- key for result in results for match in result['matches'] for key in match
+ key for result in results for match in result['matches'] for key in match if key != 'secondary'
))
+ secondary_headers.extend(list(dict.fromkeys(
+ DomsCSVFormatter.__get_variable_name(variable) for result in results for match in result['matches'] for variable in match['secondary']
+ )))
+
writer = csv.writer(csv_mem_file)
writer.writerow(list(itertools.chain(primary_headers, secondary_headers)))
@@ -145,13 +162,24 @@ class DomsCSVFormatter:
for key, value in primaryValue.items():
if key == 'matches':
continue
- index = primary_headers.index(key)
- primary_row[index] = value
+
+ if key != 'primary':
+ index = primary_headers.index(key)
+ primary_row[index] = value
+ else:
+ for variable in value:
+ index = primary_headers.index(DomsCSVFormatter.__get_variable_name(variable))
+ primary_row[index] = variable['variable_value']
# Secondary
secondary_row = [None for _ in range(len(secondary_headers))]
for key, value in matchup.items():
- index = secondary_headers.index(key)
- secondary_row[index] = value
+ if key != 'secondary':
+ index = secondary_headers.index(key)
+ secondary_row[index] = value
+ else:
+ for variable in value:
+ index = secondary_headers.index(DomsCSVFormatter.__get_variable_name(variable))
+ secondary_row[index] = variable['variable_value']
writer.writerow(list(itertools.chain(primary_row, secondary_row)))
@staticmethod
@@ -241,8 +269,8 @@ class DomsCSVFormatter:
{"Global Attribute": "CDMS_time_to_complete", "Value": details["timeToComplete"]},
{"Global Attribute": "CDMS_time_to_complete_units", "Value": "seconds"},
- {"Global Attribute": "CDMS_num_secondary_matched", "Value": details["numInSituMatched"]},
- {"Global Attribute": "CDMS_num_primary_matched", "Value": details["numGriddedMatched"]},
+ {"Global Attribute": "CDMS_num_secondary_matched", "Value": details["numSecondaryMatched"]},
+ {"Global Attribute": "CDMS_num_primary_matched", "Value": details["numPrimaryMatched"]},
{"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
{"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
@@ -272,8 +300,8 @@ class DomsNetCDFFormatter:
dataset.time_coverage_end = params["endTime"].strftime(ISO_8601)
dataset.time_coverage_resolution = "point"
dataset.CDMS_secondary = params["matchup"]
- dataset.CDMS_num_matchup_matched = details["numInSituMatched"]
- dataset.CDMS_num_primary_matched = details["numGriddedMatched"]
+ dataset.CDMS_num_matchup_matched = details["numSecondaryMatched"]
+ dataset.CDMS_num_primary_matched = details["numPrimaryMatched"]
bbox = geo.BoundingBox(asString=params["bbox"])
dataset.geospatial_lat_max = bbox.north
@@ -377,13 +405,15 @@ class DomsNetCDFFormatter:
# Add each match only if it is not already in the array of in situ points
for match in result["matches"]:
- if match["id"] not in ids:
- ids[match["id"]] = insituIndex
+ key = (match['id'], f'{match["depth"]:.4}')
+
+ if key not in ids:
+ ids[key] = insituIndex
insituIndex += 1
insituWriter.addData(match)
# Append an index pait of (satellite, in situ) to the array of matches
- matches.append((r, ids[match["id"]]))
+ matches.append((r, ids[key]))
# Add data/write to the netCDF file
satelliteWriter.writeGroup()
@@ -413,7 +443,8 @@ class DomsNetCDFValueWriter:
non_data_fields = [
'id', 'lon', 'lat',
'source', 'device',
- 'platform', 'time', 'matches'
+ 'platform', 'time', 'matches',
+ 'point', 'fileurl'
]
self.lat.append(result_item.get('lat', None))
self.lon.append(result_item.get('lon', None))
@@ -461,16 +492,38 @@ class DomsNetCDFValueWriter:
depthVar[:] = self.depth
for variable_name, data in self.data_map.items():
- # Create a variable for each data point
- data_variable = self.group.createVariable(variable_name, 'f4', ('dim',), fill_value=-32767.0)
- # Find min/max for data variables. It is possible for 'None' to
- # be in this list, so filter those out when doing the calculation.
- min_data = min(val for val in data if val is not None)
- max_data = max(val for val in data if val is not None)
- self.__enrichVariable(data_variable, min_data, max_data, has_depth=self.depth)
- data_variable[:] = data
- data_variable.long_name = variable_name
- data_variable.standard_name = variable_name
+ units = {}
+
+ variables = dict.fromkeys(
+ ((variable['variable_name'], variable['cf_variable_name']) for match in data for variable in match),
+ None
+ )
+
+ for variable in variables:
+ variables[variable] = np.repeat(np.nan, len(data))
+
+ for i, match in enumerate(data):
+ for variable in match:
+ key = (variable['variable_name'], variable['cf_variable_name'])
+ unit = variable['variable_unit']
+ units[key] = str(unit) if unit is not None else 'UNKNOWN'
+ variables[key][i] = variable['variable_value']
+
+ for variable in variables:
+ # Create a variable for each data point
+ name = variable[0]
+ cf_name = variable[1]
+
+ data_variable = self.group.createVariable(
+ cf_name if cf_name is not None and cf_name != '' else name, 'f4', ('dim',), fill_value=-32767.0)
+ # Find min/max for data variables. It is possible for 'None' to
+ # be in this list, so filter those out when doing the calculation.
+ min_data = np.nanmin(variables[variable])
+ max_data = np.nanmax(variables[variable])
+ self.__enrichVariable(data_variable, min_data, max_data, has_depth=None, unit=units[variable])
+ data_variable[:] = np.ma.masked_invalid(variables[variable])
+ data_variable.long_name = name
+ data_variable.standard_name = cf_name
#
# Lists may include 'None" values, to calc min these must be filtered out
@@ -480,13 +533,13 @@ class DomsNetCDFValueWriter:
return min(x for x in var if x is not None)
@staticmethod
- def __enrichVariable(var, var_min, var_max, has_depth):
+ def __enrichVariable(var, var_min, var_max, has_depth, unit='UNKNOWN'):
coordinates = ['lat', 'lon', 'depth', 'time']
if not has_depth:
coordinates = ['lat', 'lon', 'time']
- var.units = 'UNKNOWN' # TODO populate this field once this metadata is in place
+ var.units = unit
var.valid_min = var_min
var.valid_max = var_max
var.coordinates = ' '.join(coordinates)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 591c5f6..b695c9e 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -163,8 +163,9 @@ class DomsInitializer:
measurement_time timestamp,
platform text,
device text,
- measurement_values map<text, decimal>,
+ measurement_values_json text,
depth decimal,
+ file_url text,
PRIMARY KEY (execution_id, is_primary, id)
);
"""
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
index 4007054..c3b95b0 100644
--- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py
+++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
@@ -46,5 +46,5 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
with ResultsStorage.ResultsRetrieval(self.config) as storage:
params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
- return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None,
+ return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=len(data),
computeOptions=None, executionId=execution_id)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index c989286..d0ead7e 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -16,6 +16,7 @@
import configparser
import json
import logging
+from time import sleep
import math
import uuid
from datetime import datetime
@@ -28,6 +29,13 @@ from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
from cassandra.query import BatchStatement
from pytz import UTC
from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
+from webservice.webmodel import NexusProcessingException
+
+BATCH_SIZE = 1024
+
+
+class ResultInsertException(IOError):
+ pass
class AbstractResultsContainer:
@@ -159,85 +167,92 @@ class ResultsStorage(AbstractResultsContainer):
cql = """
INSERT INTO doms_data
- (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary, depth)
+ (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url)
VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insertStatement = self._session.prepare(cql)
- batch = BatchStatement()
+
+ inserts = []
for result in results:
- self.__insertResult(execution_id, None, result, batch, insertStatement)
+ inserts.extend(self.__prepare_result(execution_id, None, result, insertStatement))
+
+ for i in range(5):
+ if not self.__insert_result_batches(inserts, insertStatement):
+ if i < 4:
+ self._log.warning('Some write attempts failed; retrying')
+ sleep(10)
+ else:
+ self._log.error('Some write attempts failed; max retries exceeded')
+ raise ResultInsertException('Some result inserts failed')
+ else:
+ break
+
+
+ def __insert_result_batches(self, insert_params, insertStatement):
+ query_batches = [insert_params[i:i + BATCH_SIZE] for i in range(0, len(insert_params), BATCH_SIZE)]
+ move_successful = True
+
+ n_inserts = len(insert_params)
+ writing = 0
+
+ self._log.info(f'Inserting {n_inserts} matchup entries in JSON format')
+
+ for batch in query_batches:
+ futures = []
+ writing += len(batch)
+ self._log.info(
+ f'Writing batch of {len(batch)} matchup entries | ({writing}/{n_inserts}) [{writing / n_inserts * 100:7.3f}%]')
- self._session.execute(batch)
+ for entry in batch:
+ futures.append(self._session.execute_async(insertStatement, entry))
- def __insertResult(self, execution_id, primaryId, result, batch, insertStatement):
- data_dict = {}
+ for future in futures:
+ try:
+ future.result()
+ except Exception:
+ move_successful = False
+
+ self._log.info('Result data write attempt completed')
+ return move_successful
+
+ def __prepare_result(self, execution_id, primaryId, result, insertStatement):
if 'primary' in result:
- data_dict = result['primary']
+ data = result['primary']
elif 'secondary' in result:
- data_dict = result['secondary']
+ data = result['secondary']
+ else:
+ data = []
- dataMap = self.__buildDataMap(data_dict)
result_id = uuid.uuid4()
insert_params = (
- result_id,
- execution_id,
- result["id"],
- primaryId,
- result["lon"],
- result["lat"],
- result["source"],
- result["time"],
- result["platform"] if "platform" in result else None,
- result["device"] if "device" in result else None,
- dataMap,
- 1 if primaryId is None else 0,
- result["depth"]
- )
-
- try:
- batch.add(insertStatement, insert_params)
- except:
- self._log.error(f'Result batch INSERT preparation failed')
- self._log.error('INSERT params %s', str(insert_params))
- raise
+ result_id,
+ execution_id,
+ result["id"],
+ primaryId,
+ result["lon"],
+ result["lat"],
+ result["source"],
+ result["time"],
+ result["platform"] if "platform" in result else None,
+ result["device"] if "device" in result else None,
+ json.dumps(data, cls=DomsEncoder),
+ 1 if primaryId is None else 0,
+ result["depth"],
+ result['fileurl']
+ )
+
+ params_list = [insert_params]
- n = 0
if "matches" in result:
for match in result["matches"]:
- self.__insertResult(execution_id, result["id"], match, batch, insertStatement)
- n += 1
- if n >= 20:
- if primaryId is None:
- self.__commitBatch(batch)
- n = 0
-
- if primaryId is None:
- self.__commitBatch(batch)
-
- def __commitBatch(self, batch):
- self._session.execute(batch)
- batch.clear()
-
- def __buildDataMap(self, result):
- dataMap = {}
- for data_dict in result:
- name = data_dict.get('cf_variable_name')
-
- if name is None:
- name = data_dict['variable_name']
+ params_list.extend(self.__prepare_result(execution_id, result["id"], match, insertStatement))
- value = data_dict['variable_value']
- if isinstance(value, np.generic):
- value = value.item()
+ return params_list
- if math.isnan(value):
- value = None
- dataMap[name] = value
- return dataMap
class ResultsRetrieval(AbstractResultsContainer):
@@ -292,30 +307,37 @@ class ResultsRetrieval(AbstractResultsContainer):
}
else:
entry = {
- "id": row.value_id,
- "lon": float(row.x),
- "lat": float(row.y),
- "source": row.source_dataset,
- "device": row.device,
"platform": row.platform,
+ "device": row.device,
+ "lon": str(row.x),
+ "lat": str(row.y),
+ "point": f"Point({float(row.x):.3f} {float(row.y):.3f})",
"time": row.measurement_time.replace(tzinfo=UTC),
- "depth": row.depth
+ "depth": float(row.depth) if row.depth is not None else None,
+ "fileurl": row.file_url if hasattr(row, 'file_url') else None,
+ "id": row.value_id,
+ "source": row.source_dataset,
}
- for key in row.measurement_values:
- value = float(row.measurement_values[key])
- entry[key] = value
+
+ # If doms_data uses the old schema, default to original behavior
+
+ try:
+ entry['primary' if row.is_primary else 'secondary'] = json.loads(row.measurement_values_json)
+ except AttributeError:
+ for key in row.measurement_values:
+ value = float(row.measurement_values[key])
+ entry[key] = value
+
return entry
def __retrieveStats(self, id):
- cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1"
+ cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete FROM doms_execution_stats where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
stats = {
- "numGriddedMatched": row.num_gridded_matched,
- "numGriddedChecked": row.num_gridded_checked,
- "numInSituMatched": row.num_insitu_matched,
- "numInSituChecked": row.num_insitu_checked,
- "timeToComplete": row.time_to_complete
+ "timeToComplete": row.time_to_complete,
+ "numSecondaryMatched": row.num_insitu_matched,
+ "numPrimaryMatched": row.num_gridded_matched,
}
return stats
@@ -325,18 +347,23 @@ class ResultsRetrieval(AbstractResultsContainer):
cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
+ matchup = row.matchup_datasets.split(",")
+
+ if len(matchup) == 1:
+ matchup = matchup[0]
+
params = {
"primary": row.primary_dataset,
- "matchup": row.matchup_datasets.split(","),
- "depthMin": row.depth_min,
- "depthMax": row.depth_max,
- "timeTolerance": row.time_tolerance,
- "radiusTolerance": row.radius_tolerance,
+ "matchup": matchup,
"startTime": row.start_time.replace(tzinfo=UTC),
"endTime": row.end_time.replace(tzinfo=UTC),
- "platforms": row.platforms,
"bbox": row.bounding_box,
- "parameter": row.parameter
+ "timeTolerance": int(row.time_tolerance) if row.time_tolerance is not None else None,
+ "radiusTolerance": float(row.radius_tolerance) if row.radius_tolerance is not None else None,
+ "platforms": row.platforms,
+ "parameter": row.parameter,
+ "depthMin": float(row.depth_min) if row.depth_min is not None else None,
+ "depthMax": float(row.depth_max) if row.depth_max is not None else None,
}
return params
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 6cd4676..2099ade 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -563,10 +563,15 @@ class DomsPoint(object):
) if var_value])
point.data = data
- if 'meta' in edge_point:
- point.data_id = edge_point['meta']
+ meta = edge_point.get('meta', None)
+
+ # Appending depth to data_id. Currently, our insitu data has the same id value for measurements taken at
+ # different depths. This causes secondary insitu matches to be incorrectly filtered out from NetCDF files.
+
+ if meta:
+ point.data_id = f'{meta}@{point.depth}'
else:
- point.data_id = f'{point.time}:{point.longitude}:{point.latitude}'
+ point.data_id = f'{point.time}:{point.longitude}:{point.latitude}@{point.depth}'
return point
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index a596fc3..b8250ee 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -533,7 +533,7 @@ paths:
required: true
schema:
type: string
- enum: ['CSV', 'NETCDF']
+ enum: ['CSV', 'NETCDF', 'JSON']
example: CSV
responses:
'200':
diff --git a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py
index 273bd39..e0dabe2 100644
--- a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py
+++ b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py
@@ -19,7 +19,7 @@ class NexusRendererFactory(object):
@classmethod
def get_renderer(cls, request):
- content_type = request.get_content_type()
+ content_type = request.get_content_type().upper()
if content_type in cls.content_types:
renderer_name = 'Nexus' + content_type + 'Renderer'
renderer = getattr(cls.module.nexus_tornado.request.renderers, renderer_name)
diff --git a/tools/update-doms-data-schema/update.py b/tools/update-doms-data-schema/update.py
new file mode 100644
index 0000000..1f08b12
--- /dev/null
+++ b/tools/update-doms-data-schema/update.py
@@ -0,0 +1,226 @@
+import argparse
+import configparser
+import decimal
+import json
+import logging
+
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
+from cassandra.policies import (DCAwareRoundRobinPolicy, TokenAwarePolicy,
+ WhiteListRoundRobinPolicy)
+
+try:
+ logging.getLogger('webservice.NexusHandler').setLevel(logging.CRITICAL)
+except:
+ pass
+
+from webservice.algorithms.doms.DomsInitialization import DomsInitializer
+
+BATCH_SIZE = 1024
+log = logging.getLogger(__name__)
+
+
+class Encoder(json.JSONEncoder):
+ def __init__(self, **args):
+ json.JSONEncoder.__init__(self, **args)
+
+ def default(self, obj):
+ if isinstance(obj, decimal.Decimal):
+ return float(obj)
+ else:
+ return json.JSONEncoder.default(self, obj)
+
+
+def main():
+ domsconfig = configparser.ConfigParser()
+ domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini'))
+
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('-u', '--cassandra-username',
+ dest='username', help='The username used to connect to Cassandra.',
+ required=True, metavar='USERNAME')
+
+ parser.add_argument('-p', '--cassandra-password',
+ dest='password', help='The password used to connect to Cassandra.',
+ required=True, metavar='PASSWORD')
+
+ parser.add_argument('--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).',
+ required=False,
+ default=domsconfig.get("cassandra", "host"),
+ dest='hosts',
+ nargs='+',
+ metavar=('localhost', '127.0.0.101'))
+
+ parser.add_argument('--cassandraPort',
+ help='The port used to connect to Cassandra.',
+ dest='port',
+ required=False,
+ default=domsconfig.get("cassandra", "port"))
+
+ args = parser.parse_args()
+
+ cassHost = args.hosts
+ cassPort = args.port
+ cassUsername = args.username
+ cassPassword = args.password
+ cassKeyspace = domsconfig.get("cassandra", "keyspace")
+ cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
+ cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
+ cassPolicy = domsconfig.get("cassandra", "dc_policy")
+
+ log.info("Cassandra Host(s): %s" % (cassHost))
+ log.info("Cassandra Keyspace: %s" % (cassKeyspace))
+ log.info("Cassandra Datacenter: %s" % (cassDatacenter))
+ log.info("Cassandra Protocol Version: %s" % (cassVersion))
+ log.info("Cassandra DC Policy: %s" % (cassPolicy))
+ log.info("Cassandra Auth: %s : %s" % (cassUsername, cassPassword))
+
+ if cassPolicy == 'DCAwareRoundRobinPolicy':
+ dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
+ token_policy = TokenAwarePolicy(dc_policy)
+ elif cassPolicy == 'WhiteListRoundRobinPolicy':
+ token_policy = WhiteListRoundRobinPolicy([cassHost])
+ else:
+ raise ValueError(cassPolicy)
+
+ if cassUsername and cassPassword:
+ auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
+ else:
+ auth_provider = None
+
+ try:
+ with Cluster([host for host in cassHost.split(',')],
+ port=int(cassPort),
+ execution_profiles={
+ EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy)
+ },
+ protocol_version=cassVersion,
+ auth_provider=auth_provider) as cluster:
+ session = cluster.connect(cassKeyspace)
+
+ cql = """
+ alter table doms_data
+ add measurement_values_json text;
+ """
+
+ log.info('Creating measurement_values_json column')
+
+ try:
+ session.execute(cql)
+ except:
+ log.warning('measurement_values_json column creation failed; perhaps it already exists')
+
+ cql = """
+ alter table doms_data
+ add file_url text;
+ """
+
+ log.info('Creating file_url column')
+
+ try:
+ session.execute(cql)
+ except:
+ log.warning('file_url column creation failed; perhaps it already exists')
+
+ for i in range(5):
+ if not move_data(session):
+ if i < 4:
+ log.warning('Some move attempts failed; retrying')
+ else:
+ log.critical('Some move attempts failed; max retries exceeded')
+ exit(1)
+ else:
+ break
+
+ cql = """
+ alter table doms_data
+ drop measurement_values;
+ """
+
+ log.info('Dropping old measurement_values column')
+ session.execute(cql)
+ except NoHostAvailable as e:
+ log.error("Unable to connect to Cassandra, Nexus will not be able to access local data ", e)
+ except Exception as e:
+ log.critical('An uncaught exception occurred')
+ log.exception(e)
+ exit(2)
+
+
+def move_data(session):
+ cql = """
+ SELECT execution_id, is_primary, id, measurement_values FROM doms_data;
+ """
+
+ log.info('Fetching execution measurements')
+
+ try:
+ rows = session.execute(cql)
+ except:
+ log.warning('SELECT query failed; the measurement_values column may no longer exist')
+ exit(0)
+
+ update_params = []
+
+ for row in rows:
+ if row.measurement_values is not None:
+ update_params.append((
+ json.dumps(translate_values(dict(row.measurement_values)), indent=4, cls=Encoder), # values
+ row.execution_id, # execution_id
+ row.is_primary, # is_primary
+ row.id, # id
+ ))
+
+ update_cql = """
+ UPDATE doms_data
+ SET measurement_values=null,
+ measurement_values_json=?
+ WHERE execution_id=? AND
+ is_primary=? AND
+ id=?;
+ """
+
+ update_query = session.prepare(update_cql)
+ query_batches = [update_params[i:i + BATCH_SIZE] for i in range(0, len(update_params), BATCH_SIZE)]
+ move_successful = True
+
+ n_entries = len(update_params)
+ writing = 0
+
+ log.info(f'Writing {n_entries} entries in JSON format')
+
+ for batch in query_batches:
+ futures = []
+ writing += len(batch)
+ log.info(f'Writing batch of {len(batch)} entries | ({writing}/{n_entries}) [{writing/n_entries*100:7.3f}%]')
+
+ for entry in batch:
+ futures.append(session.execute_async(update_query, entry))
+
+ for future in futures:
+ try:
+ future.result()
+ except Exception:
+ move_successful = False
+
+ log.info('Move attempt completed')
+ return move_successful
+
+
+def translate_values(values_dict):
+ values = []
+
+ for key in values_dict:
+ values.append({
+ "variable_name": "",
+ "cf_variable_name": key,
+ "variable_value": values_dict[key],
+ "variable_unit": None
+ })
+
+ return values
+
+
+if __name__ == '__main__':
+ main()