You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by fg...@apache.org on 2019/08/20 18:18:26 UTC

[incubator-sdap-nexus] branch 1.1.0-SNAPSHOT updated: SDAP-144 add quality flag support for DOMS (#74)

This is an automated email from the ASF dual-hosted git repository.

fgreg pushed a commit to branch 1.1.0-SNAPSHOT
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/1.1.0-SNAPSHOT by this push:
     new 1932a69  SDAP-144 add quality flag support for DOMS  (#74)
1932a69 is described below

commit 1932a69cfe9a66ba45faf2e8ba3d0e18bebd4a08
Author: echyam <ec...@users.noreply.github.com>
AuthorDate: Tue Aug 20 11:18:22 2019 -0700

    SDAP-144 add quality flag support for DOMS  (#74)
    
    * add quality flag support for DOMS issue SDAP-144
    
    * Formatting tweaks and couple bug fixes. Allow for None qualityFlag API parameter. Change from wind_direction_quality to wind_component_quality
---
 .../algorithms/doms/DomsInitialization.py          |  7 +++-
 .../webservice/algorithms/doms/ResultsStorage.py   | 33 ++++++++++-----
 analysis/webservice/algorithms_spark/Matchup.py    | 49 ++++++++++++++++++----
 3 files changed, 68 insertions(+), 21 deletions(-)

diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 21832c1..65d5009 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -103,7 +103,8 @@ class DomsInitializer:
               end_time timestamp,
               platforms text,
               bounding_box text,
-              parameter text
+              parameter text,
+              quality_flag int
             );
         """
         session.execute(cql)
@@ -122,6 +123,10 @@ class DomsInitializer:
               y decimal,
               source_dataset text,
               measurement_time timestamp,
+              wind_speed_quality int,
+              wind_component_quality int,
+              sst_quality int,
+              sss_quality int,
               platform text,
               device text,
               measurement_values map<text, decimal>,
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 03bbd09..7b5de10 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 
-
 import ConfigParser
 import logging
 import uuid
@@ -86,9 +85,9 @@ class ResultsStorage(AbstractResultsContainer):
 
     def __insertParams(self, execution_id, params):
         cql = """INSERT INTO doms_params
-                    (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter)
+                    (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter, quality_flag)
                  VALUES
-                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
         """
         self._session.execute(cql, (execution_id,
                                     params["primary"],
@@ -102,7 +101,8 @@ class ResultsStorage(AbstractResultsContainer):
                                     params["endTime"],
                                     params["platforms"],
                                     params["bbox"],
-                                    params["parameter"]
+                                    params["parameter"],
+                                    params["qualityFlag"] if "qualityFlag" in params.keys() else None
                                     ))
 
     def __insertStats(self, execution_id, stats):
@@ -125,9 +125,9 @@ class ResultsStorage(AbstractResultsContainer):
 
         cql = """
            INSERT INTO doms_data
-                (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary)
+                (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, wind_speed_quality, wind_component_quality, sst_quality, sss_quality, platform, device, measurement_values, is_primary)
            VALUES
-                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
         """
         insertStatement = self._session.prepare(cql)
         batch = BatchStatement()
@@ -150,6 +150,10 @@ class ResultsStorage(AbstractResultsContainer):
             result["y"],
             result["source"],
             result["time"],
+            result["wind_speed_quality"],
+            result["wind_component_quality"],
+            result["sea_water_temperature_quality"],
+            result["sea_water_salinity_quality"],
             result["platform"] if "platform" in result else None,
             result["device"] if "device" in result else None,
             dataMap,
@@ -178,8 +182,8 @@ class ResultsStorage(AbstractResultsContainer):
         dataMap = {}
         for name in result:
             value = result[name]
-            if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type(
-                    value) in [float, int]:
+            if name not in ["id", "x", "y", "source", "time", "platform", "device", "point",
+                            "matches"] and value is not None and type(value) in [float, int]:
                 dataMap[name] = value
         return dataMap
 
@@ -223,6 +227,7 @@ class ResultsRetrieval(AbstractResultsContainer):
         dataMap = {}
         for row in rows:
             entry = self.__rowToDataEntry(row, trim_data=trim_data)
+
             dataMap[row.value_id] = entry
         return dataMap
 
@@ -242,11 +247,16 @@ class ResultsRetrieval(AbstractResultsContainer):
                 "source": row.source_dataset,
                 "device": row.device,
                 "platform": row.platform,
-                "time": row.measurement_time.replace(tzinfo=UTC)
+                "time": row.measurement_time.replace(tzinfo=UTC),
+                "wind_speed_quality": row.wind_speed_quality,
+                "wind_component_quality": row.wind_component_quality,
+                "sea_water_temperature_quality": row.sst_quality,
+                "sea_water_salinity_quality": row.sss_quality
             }
+
         for key in row.measurement_values:
             value = float(row.measurement_values[key])
-            entry[key] = value
+            entry[key] = entry[key] if key in entry else value
         return entry
 
     def __retrieveStats(self, id):
@@ -279,7 +289,8 @@ class ResultsRetrieval(AbstractResultsContainer):
                 "endTime": row.end_time.replace(tzinfo=UTC),
                 "platforms": row.platforms,
                 "bbox": row.bounding_box,
-                "parameter": row.parameter
+                "parameter": row.parameter,
+                "qualityFlag": row.quality_flag
             }
             return params
 
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index cd21f17..6fc5905 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -31,7 +31,6 @@ from shapely import wkt
 from shapely.geometry import Point
 from shapely.geometry import box
 from shapely.geos import ReadingError
-
 from webservice.NexusHandler import SparkHandler, nexus_handler
 from webservice.algorithms.doms import config as edge_endpoints
 from webservice.algorithms.doms import values as doms_values
@@ -70,6 +69,12 @@ class Matchup(SparkHandler):
             "type": "string",
             "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
         },
+        "qualityFlag": {
+            "name": "In-Situ Quality Flag",
+            "type": "int",
+            "description": "The quality of in-situ data to be used for the match up. It must be an IODE primary level "
+                           "flag. Optional. Default: no quality filter applied"
+        },
         "startTime": {
             "name": "Start Time",
             "type": "string",
@@ -180,6 +185,11 @@ class Matchup(SparkHandler):
             raise NexusProcessingException(
                 reason="Depth Min should be less than Depth Max", code=400)
 
+        quality_flag = request.get_int_arg('qualityFlag', default=None)
+        if quality_flag not in [1, 2, 3, 4, 9, None]:
+            raise NexusProcessingException(
+                reason="Quality flag not a IODE primary level flag value", code=400)
+
         time_tolerance = request.get_int_arg('tt', default=86400)
         radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
         platforms = request.get_argument('platforms', None)
@@ -201,7 +211,7 @@ class Matchup(SparkHandler):
 
         return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
                start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
-               depth_min, depth_max, time_tolerance, radius_tolerance, \
+               depth_min, depth_max, quality_flag, time_tolerance, radius_tolerance, \
                platforms, match_once, result_size_limit
 
     def calc(self, request, **args):
@@ -209,7 +219,7 @@ class Matchup(SparkHandler):
         # TODO Assuming Satellite primary
         bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
         start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
-        depth_min, depth_max, time_tolerance, radius_tolerance, \
+        depth_min, depth_max, quality_flag, time_tolerance, radius_tolerance, \
         platforms, match_once, result_size_limit = self.parse_arguments(request)
 
         with ResultsStorage() as resultsStorage:
@@ -229,7 +239,8 @@ class Matchup(SparkHandler):
         self.log.debug("Calling Spark Driver")
         try:
             spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
-                                                matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
+                                                matchup_ds_names, parameter_s, depth_min, depth_max, quality_flag,
+                                                time_tolerance,
                                                 radius_tolerance, platforms, match_once, sc=self._sc)
         except Exception as e:
             self.log.exception(e)
@@ -247,7 +258,7 @@ class Matchup(SparkHandler):
             "timeTolerance": time_tolerance,
             "radiusTolerance": float(radius_tolerance),
             "platforms": platforms,
-            "parameter": parameter_s
+            "parameter": parameter_s,
         }
 
         if depth_min is not None:
@@ -256,6 +267,9 @@ class Matchup(SparkHandler):
         if depth_max is not None:
             args["depthMax"] = float(depth_max)
 
+        if quality_flag is not None:
+            args["qualityFlag"] = int(quality_flag)
+
         total_keys = len(spark_result.keys())
         total_values = sum(len(v) for v in spark_result.itervalues())
         details = {
@@ -301,10 +315,14 @@ class Matchup(SparkHandler):
         return {
             "sea_water_temperature": domspoint.sst,
             "sea_water_temperature_depth": domspoint.sst_depth,
+            "sea_water_temperature_quality": domspoint.sst_quality,
             "sea_water_salinity": domspoint.sss,
             "sea_water_salinity_depth": domspoint.sss_depth,
+            "sea_water_salinity_quality": domspoint.sss_quality,
             "wind_speed": domspoint.wind_speed,
             "wind_direction": domspoint.wind_direction,
+            "wind_speed_quality": domspoint.wind_speed_quality,
+            "wind_component_quality": domspoint.wind_component_quality,
             "wind_u": domspoint.wind_u,
             "wind_v": domspoint.wind_v,
             "platform": doms_values.getPlatformById(domspoint.platform),
@@ -332,10 +350,14 @@ class DomsPoint(object):
         self.wind_v = None
         self.wind_direction = None
         self.wind_speed = None
+        self.wind_speed_quality = None
+        self.wind_component_quality = None
         self.sst = None
         self.sst_depth = None
+        self.sst_quality = None
         self.sss = None
         self.sss_depth = None
+        self.sss_quality = None
         self.source = None
         self.depth = None
         self.platform = None
@@ -414,10 +436,14 @@ class DomsPoint(object):
         point.wind_v = edge_point.get('northward_wind')
         point.wind_direction = edge_point.get('wind_direction')
         point.wind_speed = edge_point.get('wind_speed')
+        point.wind_speed_quality = edge_point.get('wind_speed_quality')
+        point.wind_component_quality = edge_point.get('wind_component_quality')
         point.sst = edge_point.get('sea_water_temperature')
         point.sst_depth = edge_point.get('sea_water_temperature_depth')
+        point.sst_quality = edge_point.get('sea_water_temperature_quality')
         point.sss = edge_point.get('sea_water_salinity')
         point.sss_depth = edge_point.get('sea_water_salinity_depth')
+        point.sss_quality = edge_point.get('sea_water_salinity_quality')
         point.source = edge_point.get('source')
         point.platform = edge_point.get('platform')
         point.device = edge_point.get('device')
@@ -437,6 +463,7 @@ DRIVER_LOCK = Lock()
 
 
 def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
+                         quality_flag,
                          time_tolerance, radius_tolerance, platforms, match_once, sc=None):
     from functools import partial
 
@@ -446,6 +473,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_nam
         matchup_b = sc.broadcast(matchup_ds_names)
         depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
         depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
+        quality_flag_b = sc.broadcast(int(quality_flag) if quality_flag is not None else None)
         tt_b = sc.broadcast(time_tolerance)
         rt_b = sc.broadcast(float(radius_tolerance))
         platforms_b = sc.broadcast(platforms)
@@ -459,7 +487,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_nam
     rdd_filtered = rdd.mapPartitions(
         partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
                 rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
-                depth_max_b=depth_max_b), preservesPartitioning=True) \
+                depth_max_b=depth_max_b, quality_flag_b=quality_flag_b), preservesPartitioning=True) \
         .filter(lambda p_m_tuple: abs(
         iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
 
@@ -527,7 +555,7 @@ def add_meters_to_lon_lat(lon, lat, meters):
 
 
 def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
-                              bounding_wkt_b, depth_min_b, depth_max_b):
+                              bounding_wkt_b, depth_min_b, depth_max_b, quality_flag_b):
     the_time = datetime.now()
     tile_ids = list(tile_ids)
     if len(tile_ids) == 0:
@@ -571,7 +599,8 @@ def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b,
             bbox = ','.join(
                 [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
             edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
-                                       platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
+                                       platforms_b.value, depth_min_b.value, depth_max_b.value, quality_flag_b.value,
+                                       session=edge_session)
             if edge_response['totalResults'] == 0:
                 continue
             r = edge_response['results']
@@ -655,7 +684,8 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
                 yield p_doms_point, m_doms_point
 
 
-def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, quality_flag,
+               itemsPerPage=1000,
                startIndex=0, stats=True, session=None):
     try:
         startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
@@ -682,6 +712,7 @@ def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min,
               "minDepth": depth_min,
               "maxDepth": depth_max,
               "platform": platform,
+              "qualityFlag": quality_flag,
               "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
 
     if session is not None: