You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2022/06/01 18:53:20 UTC

[incubator-sdap-nexus] branch master updated: SDAP-387: Update NetCDF matchup output format (#159)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 884410c  SDAP-387: Update NetCDF matchup output format (#159)
884410c is described below

commit 884410c6a1e9040a0f2811ecec606ceca9a00872
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Wed Jun 1 11:53:16 2022 -0700

    SDAP-387: Update NetCDF matchup output format (#159)
    
    * NetCDF matchup output
    
    * Updated netCDF builder code to look at new param names
    
    * Integrated Vardis feedback
    
    * Remove todos, add newline to end of file
    
    * Fixed failing test
---
 .../tests/algorithms/test_base_doms_handler.py     |  70 ++++--
 .../webservice/algorithms/doms/BaseDomsHandler.py  | 260 +++++++--------------
 .../request/renderers/NexusNETCDFRenderer.py       |   4 +-
 3 files changed, 145 insertions(+), 189 deletions(-)

diff --git a/analysis/tests/algorithms/test_base_doms_handler.py b/analysis/tests/algorithms/test_base_doms_handler.py
index dbb579f..625e9b1 100644
--- a/analysis/tests/algorithms/test_base_doms_handler.py
+++ b/analysis/tests/algorithms/test_base_doms_handler.py
@@ -1,20 +1,19 @@
 import datetime
 import uuid
 import csv
-from webservice.algorithms.doms.BaseDomsHandler import DomsCSVFormatter
+import pytest
+from netCDF4 import Dataset
+import io
+from webservice.algorithms.doms.BaseDomsHandler import DomsCSVFormatter, DomsNetCDFFormatter
 
 
-def test_csv():
-    """
-    Test that CSV is constructed properly given result, params, and
-    details.
-    """
-    test_execution_id = str(uuid.uuid4())
+@pytest.fixture()
+def test_input():
     results = [
         {
             "id": "9c08c026-eff7-30a7-ab1e-413a64f507ff[[0 0 3]]",
-            "x": 173.375,
-            "y": -29.875,
+            "lon": 173.375,
+            "lat": -29.875,
             "source": "MUR25-JPL-L4-GLOB-v04.2",
             "device": "radiometers",
             "platform": "orbiting satellite",
@@ -25,8 +24,8 @@ def test_csv():
             "matches": [
                 {
                     "id": "PCEWYL",
-                    "x": 173.38,
-                    "y": -29.88,
+                    "lon": 173.38,
+                    "lat": -29.88,
                     "source": "icoads",
                     "device": None,
                     "platform": "drifting surface float",
@@ -37,8 +36,8 @@ def test_csv():
         },
         {
             "id": "8ff1b246-16de-34e2-87bb-600c4107a7f8[[ 0  8 15]]",
-            "x": 161.375,
-            "y": -27.875,
+            "lon": 161.375,
+            "lat": -27.875,
             "source": "MUR25-JPL-L4-GLOB-v04.2",
             "device": "radiometers",
             "platform": "orbiting satellite",
@@ -49,8 +48,8 @@ def test_csv():
             "matches": [
                 {
                     "id": "PCY3CI",
-                    "x": 161.38,
-                    "y": -27.88,
+                    "lon": 161.38,
+                    "lat": -27.88,
                     "source": "icoads",
                     "device": None,
                     "platform": "drifting surface float",
@@ -81,6 +80,18 @@ def test_csv():
         'timeToComplete': 26
     }
 
+    yield results, params, details
+
+
+def test_csv(test_input):
+    """
+    Test that CSV is constructed properly given result, params, and
+    details.
+    """
+    test_execution_id = str(uuid.uuid4())
+
+    results, params, details = test_input
+
     csv_formatter = DomsCSVFormatter()
     csv_result = csv_formatter.create(
         executionId=test_execution_id,
@@ -111,3 +122,32 @@ def test_csv():
 
         if 'id' == row[0]:
             header = row
+
+
+def test_netcdf(test_input):
+    """
+    Test that the /domsresults endpoint results in a properly
+    structured NetCDF file.
+    """
+    test_execution_id = str(uuid.uuid4())
+
+    results, params, details = test_input
+
+    nc_formatter = DomsNetCDFFormatter()
+    nc_result = nc_formatter.create(
+        executionId=test_execution_id,
+        results=results,
+        params=params,
+        details=details
+    )
+
+    ds = Dataset('test', memory=nc_result)
+
+    assert 'PrimaryData' in ds.groups
+    assert 'SecondaryData' in ds.groups
+
+    assert 'sst_anomaly' in ds.groups['PrimaryData'].variables
+    assert 'analysis_error' in ds.groups['PrimaryData'].variables
+    assert 'analysed_sst' in ds.groups['PrimaryData'].variables
+
+    assert 'sea_water_temperature' in ds.groups['SecondaryData'].variables
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index b5834f9..bb32146 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -21,6 +21,7 @@ from datetime import datetime
 import time
 import itertools
 import importlib_metadata
+from collections import defaultdict
 from decimal import Decimal
 
 import numpy as np
@@ -258,11 +259,11 @@ class DomsNetCDFFormatter:
     @staticmethod
     def create(executionId, results, params, details):
 
-        t = tempfile.mkstemp(prefix="doms_", suffix=".nc")
+        t = tempfile.mkstemp(prefix="cdms_", suffix=".nc")
         tempFileName = t[1]
 
         dataset = Dataset(tempFileName, "w", format="NETCDF4")
-        dataset.DOMS_matchID = executionId
+        dataset.CDMS_matchID = executionId
         DomsNetCDFFormatter.__addNetCDFConstants(dataset)
 
         dataset.date_modified = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)
@@ -270,9 +271,9 @@ class DomsNetCDFFormatter:
         dataset.time_coverage_start = params["startTime"].strftime(ISO_8601)
         dataset.time_coverage_end = params["endTime"].strftime(ISO_8601)
         dataset.time_coverage_resolution = "point"
-        dataset.DOMS_match_up = params["matchup"]
-        dataset.DOMS_num_matchup_matched = details["numInSituMatched"]
-        dataset.DOMS_num_primary_matched = details["numGriddedMatched"]
+        dataset.CDMS_secondary = params["matchup"]
+        dataset.CDMS_num_matchup_matched = details["numInSituMatched"]
+        dataset.CDMS_num_primary_matched = details["numGriddedMatched"]
 
         bbox = geo.BoundingBox(asString=params["bbox"])
         dataset.geospatial_lat_max = bbox.north
@@ -286,22 +287,22 @@ class DomsNetCDFFormatter:
         dataset.geospatial_vertical_units = "m"
         dataset.geospatial_vertical_positive = "down"
 
-        dataset.DOMS_TimeWindow = params["timeTolerance"] / 60 / 60
-        dataset.DOMS_TimeWindow_Units = "hours"
-        dataset.DOMS_SearchRadius = float(params["radiusTolerance"])
-        dataset.DOMS_SearchRadius_Units = "m"
+        dataset.CDMS_TimeWindow = params["timeTolerance"] / 60 / 60
+        dataset.CDMS_TimeWindow_Units = "hours"
+        dataset.CDMS_SearchRadius = float(params["radiusTolerance"])
+        dataset.CDMS_SearchRadius_Units = "m"
         dataset.URI_Matchup = "https://doms.jpl.nasa.gov/domsresults?id=" + executionId + "&output=NETCDF"
-        dataset.DOMS_ParameterPrimary = params["parameter"] if "parameter" in params else ""
-        dataset.DOMS_platforms = params["platforms"]
-        dataset.DOMS_primary = params["primary"]
-        dataset.DOMS_time_to_complete = details["timeToComplete"]
-        dataset.DOMS_time_to_complete_units = "seconds"
+        dataset.CDMS_ParameterPrimary = params["parameter"] if "parameter" in params else ""
+        dataset.CDMS_platforms = params["platforms"]
+        dataset.CDMS_primary = params["primary"]
+        dataset.CDMS_time_to_complete = details["timeToComplete"]
+        dataset.CDMS_time_to_complete_units = "seconds"
 
         insituDatasets = params["matchup"]
         insituLinks = set()
         for insitu in insituDatasets:
             insituLinks.add(config.METADATA_LINKS[insitu])
-        dataset.DOMS_DatasetMetadata = ', '.join(insituLinks)
+        dataset.CDMS_DatasetMetadata = ', '.join(insituLinks)
 
         platforms = set()
         for primaryValue in results:
@@ -310,8 +311,8 @@ class DomsNetCDFFormatter:
                 platforms.add(match['platform'])
         dataset.platform = ', '.join(platforms)
 
-        satellite_group_name = "SatelliteData"
-        insitu_group_name = "InsituData"
+        satellite_group_name = 'PrimaryData'
+        insitu_group_name = "SecondaryData"
 
         #Create Satellite group, variables, and attributes
         satelliteGroup = dataset.createGroup(satellite_group_name)
@@ -338,15 +339,15 @@ class DomsNetCDFFormatter:
     @staticmethod
     def __addNetCDFConstants(dataset):
         dataset.product_version = "1.0"
-        dataset.Conventions = "CF-1.6, ACDD-1.3"
-        dataset.title = "DOMS satellite-insitu machup output file"
-        dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"
+        dataset.Conventions = "CF-1.8, ACDD-1.3"
+        dataset.title = "CDMS satellite-insitu machup output file"
+        dataset.history = "Processing_Version = V1.0, Software_Name = CDMS, Software_Version = 1.03"
         dataset.institution = "JPL, FSU, NCAR, Saildrone"
         dataset.source = "doms.jpl.nasa.gov"
         dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary"
         dataset.cdm_data_type = "Point/Profile, Swath/Grid"
         dataset.processing_level = "4"
-        dataset.project = "Distributed Oceanographic Matchup System (DOMS)"
+        dataset.project = "Cloud-Based Data Matchup Service (CDMS)"
         dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords"
         dataset.keywords = "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, NASA/JPL/PODAAC, " \
                            "FSU/COAPS, UCAR/NCAR, SALINITY, SEA SURFACE TEMPERATURE, SURFACE WINDS"
@@ -396,47 +397,51 @@ class DomsNetCDFValueWriter:
         self.lat = []
         self.lon = []
         self.time = []
-        self.sea_water_salinity = []
-        self.wind_speed = []
-        self.wind_u = []
-        self.wind_v = []
-        self.wind_direction = []
-        self.sea_water_temperature = []
         self.depth = []
 
-        self.satellite_group_name = "SatelliteData"
-        self.insitu_group_name = "InsituData"
-
-        #
-        # Only include the depth variable related to the match-up parameter. If the match-up parameter is
-        # not sss or sst then do not include any depth data, just fill values.
-        #
-        if matchup_parameter == "sss":
-            self.matchup_depth = "sea_water_salinity_depth"
-        elif matchup_parameter == "sst":
-            self.matchup_depth = "sea_water_temperature_depth"
-        else:
-            self.matchup_depth = "NO_DEPTH"
-
-    def addData(self, value):
-        self.lat.append(value.get("y", None))
-        self.lon.append(value.get("x", None))
-        self.time.append(time.mktime(value.get("time").timetuple()))
-        self.sea_water_salinity.append(value.get("sea_water_salinity", None))
-        self.wind_speed.append(value.get("wind_speed", None))
-        self.wind_u.append(value.get("wind_u", None))
-        self.wind_v.append(value.get("wind_v", None))
-        self.wind_direction.append(value.get("wind_direction", None))
-        self.sea_water_temperature.append(value.get("sea_water_temperature", None))
-        self.depth.append(value.get(self.matchup_depth, None))
+        self.primary_group_name = "PrimaryData"
+        self.secondary_group_name = "SecondaryData"
+        self.data_map = defaultdict(list)
+
+    def addData(self, result_item):
+        """
+        Populate DomsNetCDFValueWriter fields from matchup results dict
+        """
+        non_data_fields = [
+            'id', 'lon', 'lat',
+            'source', 'device',
+            'platform', 'time', 'matches'
+        ]
+        self.lat.append(result_item.get('lon', None))
+        self.lon.append(result_item.get('lat', None))
+        self.time.append(time.mktime(result_item.get('time').timetuple()))
+
+        # All other variables are assumed to be science variables.
+        # Add DataPoints accordingly.
+        for key, value in result_item.items():
+            if 'depth' in key:
+                self.depth.append(result_item.get(key))
+                continue
+            if key not in non_data_fields:
+                if len(self.data_map[key]) != len(self.lat) - 1:
+                    # If the counts mismatch, fill this variable with
+                    # None so the data matches the size
+                    size_diff = len(self.lat) - len(self.data_map[key]) - 1
+                    self.data_map[key].extend([None] * size_diff)
+                self.data_map[key].append(value)
+
+        # Check if there are any variables that were not appended to.
+        # Append None, meaning that value is empty.
+        for var_name in set(self.data_map.keys()) - set(result_item.keys()):
+            self.data_map[var_name].append(None)
 
     def writeGroup(self):
         #
         # Create variables, enrich with attributes, and add data
         #
-        lonVar = self.group.createVariable("lon", "f4", ("dim",), fill_value=-32767.0)
-        latVar = self.group.createVariable("lat", "f4", ("dim",), fill_value=-32767.0)
-        timeVar = self.group.createVariable("time", "f4", ("dim",), fill_value=-32767.0)
+        lonVar = self.group.createVariable('lon', 'f4', ('dim',), fill_value=-32767.0)
+        latVar = self.group.createVariable('lat', 'f4', ('dim',), fill_value=-32767.0)
+        timeVar = self.group.createVariable('time', 'f4', ('dim',), fill_value=-32767.0)
 
         self.__enrichLon(lonVar, min(self.lon), max(self.lon))
         self.__enrichLat(latVar, min(self.lat), max(self.lat))
@@ -446,54 +451,23 @@ class DomsNetCDFValueWriter:
         lonVar[:] = self.lon
         timeVar[:] = self.time
 
-        if self.sea_water_salinity.count(None) != len(self.sea_water_salinity):
-            if self.group.name == self.satellite_group_name:
-                sssVar = self.group.createVariable("SeaSurfaceSalinity", "f4", ("dim",), fill_value=-32767.0)
-                self.__enrichSSSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity))
-            else:  # group.name == self.insitu_group_name
-                sssVar = self.group.createVariable("SeaWaterSalinity", "f4", ("dim",), fill_value=-32767.0)
-                self.__enrichSWSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity))
-            sssVar[:] = self.sea_water_salinity
-
-        if self.wind_speed.count(None) != len(self.wind_speed):
-            windSpeedVar = self.group.createVariable("WindSpeed", "f4", ("dim",), fill_value=-32767.0)
-            self.__enrichWindSpeed(windSpeedVar, self.__calcMin(self.wind_speed), max(self.wind_speed))
-            windSpeedVar[:] = self.wind_speed
-
-        if self.wind_u.count(None) != len(self.wind_u):
-            windUVar = self.group.createVariable("WindU", "f4", ("dim",), fill_value=-32767.0)
-            windUVar[:] = self.wind_u
-            self.__enrichWindU(windUVar, self.__calcMin(self.wind_u), max(self.wind_u))
-
-        if self.wind_v.count(None) != len(self.wind_v):
-            windVVar = self.group.createVariable("WindV", "f4", ("dim",), fill_value=-32767.0)
-            windVVar[:] = self.wind_v
-            self.__enrichWindV(windVVar, self.__calcMin(self.wind_v), max(self.wind_v))
-
-        if self.wind_direction.count(None) != len(self.wind_direction):
-            windDirVar = self.group.createVariable("WindDirection", "f4", ("dim",), fill_value=-32767.0)
-            windDirVar[:] = self.wind_direction
-            self.__enrichWindDir(windDirVar)
-
-        if self.sea_water_temperature.count(None) != len(self.sea_water_temperature):
-            if self.group.name == self.satellite_group_name:
-                tempVar = self.group.createVariable("SeaSurfaceTemp", "f4", ("dim",), fill_value=-32767.0)
-                self.__enrichSurfaceTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature))
-            else:
-                tempVar = self.group.createVariable("SeaWaterTemp", "f4", ("dim",), fill_value=-32767.0)
-                self.__enrichWaterTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature))
-            tempVar[:] = self.sea_water_temperature
-
-        if self.group.name == self.insitu_group_name:
-            depthVar = self.group.createVariable("Depth", "f4", ("dim",), fill_value=-32767.0)
-
-            if self.depth.count(None) != len(self.depth):
-                self.__enrichDepth(depthVar, self.__calcMin(self.depth), max(self.depth))
-                depthVar[:] = self.depth
-            else:
-                # If depth has no data, set all values to 0
-                tempDepth = [0 for x in range(len(self.depth))]
-                depthVar[:] = tempDepth
+        # Add depth variable, if present
+        if self.depth:
+            depthVar = self.group.createVariable('depth', 'f4', ('dim',), fill_value=-32767.0)
+            self.__enrichDepth(depthVar, self.__calcMin(self.depth), max(self.depth))
+            depthVar[:] = self.depth
+
+        for variable_name, data in self.data_map.items():
+            # Create a variable for each data point
+            data_variable = self.group.createVariable(variable_name, 'f4', ('dim',), fill_value=-32767.0)
+            # Find min/max for data variables. It is possible for 'None' to
+            # be in this list, so filter those out when doing the calculation.
+            min_data = min(val for val in data if val is not None)
+            max_data = max(val for val in data if val is not None)
+            self.__enrichVariable(data_variable, min_data, max_data, has_depth=self.depth)
+            data_variable[:] = data
+            data_variable.long_name = variable_name
+            data_variable.standard_name = variable_name
 
     #
     # Lists may include 'None" values, to calc min these must be filtered out
@@ -502,6 +476,17 @@ class DomsNetCDFValueWriter:
     def __calcMin(var):
         return min(x for x in var if x is not None)
 
+    @staticmethod
+    def __enrichVariable(var, var_min, var_max, has_depth):
+        coordinates = ['lat', 'lon', 'depth', 'time']
+
+        if not has_depth:
+            coordinates = ['lat', 'lon', 'time']
+
+        var.units = 'UNKNOWN'  # TODO populate this field once this metadata is in place
+        var.valid_min = var_min
+        var.valid_max = var_max
+        var.coordinates = ' '.join(coordinates)
 
     #
     # Add attributes to each variable
@@ -531,24 +516,6 @@ class DomsNetCDFValueWriter:
         var.axis = "T"
         var.units = "seconds since 1970-01-01 00:00:00 0:00"
 
-    @staticmethod
-    def __enrichSSSMeasurements(var, var_min, var_max):
-        var.long_name = "Sea surface salinity"
-        var.standard_name = "sea_surface_salinity"
-        var.units = "1e-3"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat time"
-
-    @staticmethod
-    def __enrichSWSMeasurements(var, var_min, var_max):
-        var.long_name = "Sea water salinity"
-        var.standard_name = "sea_water_salinity"
-        var.units = "1e-3"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat depth time"
-
     @staticmethod
     def __enrichDepth(var, var_min, var_max):
         var.valid_min = var_min
@@ -559,54 +526,3 @@ class DomsNetCDFValueWriter:
         var.axis = "Z"
         var.positive = "Down"
 
-    @staticmethod
-    def __enrichWindSpeed(var, var_min, var_max):
-        var.long_name = "Wind speed"
-        var.standard_name = "wind_speed"
-        var.units = "m s-1"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat depth time"
-
-    @staticmethod
-    def __enrichWindU(var, var_min, var_max):
-        var.long_name = "Eastward wind"
-        var.standard_name = "eastward_wind"
-        var.units = "m s-1"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat depth time"
-
-    @staticmethod
-    def __enrichWindV(var, var_min, var_max):
-        var.long_name = "Northward wind"
-        var.standard_name = "northward_wind"
-        var.units = "m s-1"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat depth time"
-
-    @staticmethod
-    def __enrichWaterTemp(var, var_min, var_max):
-        var.long_name = "Sea water temperature"
-        var.standard_name = "sea_water_temperature"
-        var.units = "degree_C"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat depth time"
-
-    @staticmethod
-    def __enrichSurfaceTemp(var, var_min, var_max):
-        var.long_name = "Sea surface temperature"
-        var.standard_name = "sea_surface_temperature"
-        var.units = "degree_C"
-        var.valid_min = var_min
-        var.valid_max = var_max
-        var.coordinates = "lon lat time"
-
-    @staticmethod
-    def __enrichWindDir(var):
-        var.long_name = "Wind from direction"
-        var.standard_name = "wind_from_direction"
-        var.units = "degree"
-        var.coordinates = "lon lat depth time"
diff --git a/analysis/webservice/nexus_tornado/request/renderers/NexusNETCDFRenderer.py b/analysis/webservice/nexus_tornado/request/renderers/NexusNETCDFRenderer.py
index 9d6ca92..2476419 100644
--- a/analysis/webservice/nexus_tornado/request/renderers/NexusNETCDFRenderer.py
+++ b/analysis/webservice/nexus_tornado/request/renderers/NexusNETCDFRenderer.py
@@ -11,7 +11,7 @@ class NexusNETCDFRenderer(object):
         tornado_handler.set_header("Content-Type", "application/x-netcdf")
         tornado_handler.set_header("Content-Disposition", "filename=\"%s\"" % self._request.get_argument('filename', "download.nc"))
         try:
-            self.write(result.toNetCDF())
+            tornado_handler.write(result.toNetCDF())
         except:
             traceback.print_exc(file=sys.stdout)
-            raise NexusProcessingException(reason="Unable to convert results to NetCDF.")
\ No newline at end of file
+            raise NexusProcessingException(reason="Unable to convert results to NetCDF.")