You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by fg...@apache.org on 2019/08/20 18:18:26 UTC
[incubator-sdap-nexus] branch 1.1.0-SNAPSHOT updated: SDAP-144 add
quality flag support for DOMS (#74)
This is an automated email from the ASF dual-hosted git repository.
fgreg pushed a commit to branch 1.1.0-SNAPSHOT
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/1.1.0-SNAPSHOT by this push:
new 1932a69 SDAP-144 add quality flag support for DOMS (#74)
1932a69 is described below
commit 1932a69cfe9a66ba45faf2e8ba3d0e18bebd4a08
Author: echyam <ec...@users.noreply.github.com>
AuthorDate: Tue Aug 20 11:18:22 2019 -0700
SDAP-144 add quality flag support for DOMS (#74)
* add quality flag support for DOMS issue SDAP-144
* Formatting tweaks and couple bug fixes. Allow for None qualityFlag API parameter. Change from wind_direction_quality to wind_component_quality
---
.../algorithms/doms/DomsInitialization.py | 7 +++-
.../webservice/algorithms/doms/ResultsStorage.py | 33 ++++++++++-----
analysis/webservice/algorithms_spark/Matchup.py | 49 ++++++++++++++++++----
3 files changed, 68 insertions(+), 21 deletions(-)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 21832c1..65d5009 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -103,7 +103,8 @@ class DomsInitializer:
end_time timestamp,
platforms text,
bounding_box text,
- parameter text
+ parameter text,
+ quality_flag int
);
"""
session.execute(cql)
@@ -122,6 +123,10 @@ class DomsInitializer:
y decimal,
source_dataset text,
measurement_time timestamp,
+ wind_speed_quality int,
+ wind_component_quality int,
+ sst_quality int,
+ sss_quality int,
platform text,
device text,
measurement_values map<text, decimal>,
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 03bbd09..7b5de10 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -14,7 +14,6 @@
# limitations under the License.
-
import ConfigParser
import logging
import uuid
@@ -86,9 +85,9 @@ class ResultsStorage(AbstractResultsContainer):
def __insertParams(self, execution_id, params):
cql = """INSERT INTO doms_params
- (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter)
+ (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter, quality_flag)
VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
self._session.execute(cql, (execution_id,
params["primary"],
@@ -102,7 +101,8 @@ class ResultsStorage(AbstractResultsContainer):
params["endTime"],
params["platforms"],
params["bbox"],
- params["parameter"]
+ params["parameter"],
+ params["qualityFlag"] if "qualityFlag" in params.keys() else None
))
def __insertStats(self, execution_id, stats):
@@ -125,9 +125,9 @@ class ResultsStorage(AbstractResultsContainer):
cql = """
INSERT INTO doms_data
- (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary)
+ (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, wind_speed_quality, wind_component_quality, sst_quality, sss_quality, platform, device, measurement_values, is_primary)
VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insertStatement = self._session.prepare(cql)
batch = BatchStatement()
@@ -150,6 +150,10 @@ class ResultsStorage(AbstractResultsContainer):
result["y"],
result["source"],
result["time"],
+ result["wind_speed_quality"],
+ result["wind_component_quality"],
+ result["sea_water_temperature_quality"],
+ result["sea_water_salinity_quality"],
result["platform"] if "platform" in result else None,
result["device"] if "device" in result else None,
dataMap,
@@ -178,8 +182,8 @@ class ResultsStorage(AbstractResultsContainer):
dataMap = {}
for name in result:
value = result[name]
- if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type(
- value) in [float, int]:
+ if name not in ["id", "x", "y", "source", "time", "platform", "device", "point",
+ "matches"] and value is not None and type(value) in [float, int]:
dataMap[name] = value
return dataMap
@@ -223,6 +227,7 @@ class ResultsRetrieval(AbstractResultsContainer):
dataMap = {}
for row in rows:
entry = self.__rowToDataEntry(row, trim_data=trim_data)
+
dataMap[row.value_id] = entry
return dataMap
@@ -242,11 +247,16 @@ class ResultsRetrieval(AbstractResultsContainer):
"source": row.source_dataset,
"device": row.device,
"platform": row.platform,
- "time": row.measurement_time.replace(tzinfo=UTC)
+ "time": row.measurement_time.replace(tzinfo=UTC),
+ "wind_speed_quality": row.wind_speed_quality,
+ "wind_component_quality": row.wind_component_quality,
+ "sea_water_temperature_quality": row.sst_quality,
+ "sea_water_salinity_quality": row.sss_quality
}
+
for key in row.measurement_values:
value = float(row.measurement_values[key])
- entry[key] = value
+ entry[key] = entry[key] if key in entry else value
return entry
def __retrieveStats(self, id):
@@ -279,7 +289,8 @@ class ResultsRetrieval(AbstractResultsContainer):
"endTime": row.end_time.replace(tzinfo=UTC),
"platforms": row.platforms,
"bbox": row.bounding_box,
- "parameter": row.parameter
+ "parameter": row.parameter,
+ "qualityFlag": row.quality_flag
}
return params
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index cd21f17..6fc5905 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -31,7 +31,6 @@ from shapely import wkt
from shapely.geometry import Point
from shapely.geometry import box
from shapely.geos import ReadingError
-
from webservice.NexusHandler import SparkHandler, nexus_handler
from webservice.algorithms.doms import config as edge_endpoints
from webservice.algorithms.doms import values as doms_values
@@ -70,6 +69,12 @@ class Matchup(SparkHandler):
"type": "string",
"description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
},
+ "qualityFlag": {
+ "name": "In-Situ Quality Flag",
+ "type": "int",
+ "description": "The quality of in-situ data to be used for the match up. It must be an IODE primary level "
+ "flag. Optional. Default: no quality filter applied"
+ },
"startTime": {
"name": "Start Time",
"type": "string",
@@ -180,6 +185,11 @@ class Matchup(SparkHandler):
raise NexusProcessingException(
reason="Depth Min should be less than Depth Max", code=400)
+ quality_flag = request.get_int_arg('qualityFlag', default=None)
+ if quality_flag not in [1, 2, 3, 4, 9, None]:
+ raise NexusProcessingException(
+ reason="Quality flag not a IODE primary level flag value", code=400)
+
time_tolerance = request.get_int_arg('tt', default=86400)
radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
platforms = request.get_argument('platforms', None)
@@ -201,7 +211,7 @@ class Matchup(SparkHandler):
return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
- depth_min, depth_max, time_tolerance, radius_tolerance, \
+ depth_min, depth_max, quality_flag, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit
def calc(self, request, **args):
@@ -209,7 +219,7 @@ class Matchup(SparkHandler):
# TODO Assuming Satellite primary
bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
- depth_min, depth_max, time_tolerance, radius_tolerance, \
+ depth_min, depth_max, quality_flag, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit = self.parse_arguments(request)
with ResultsStorage() as resultsStorage:
@@ -229,7 +239,8 @@ class Matchup(SparkHandler):
self.log.debug("Calling Spark Driver")
try:
spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
- matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
+ matchup_ds_names, parameter_s, depth_min, depth_max, quality_flag,
+ time_tolerance,
radius_tolerance, platforms, match_once, sc=self._sc)
except Exception as e:
self.log.exception(e)
@@ -247,7 +258,7 @@ class Matchup(SparkHandler):
"timeTolerance": time_tolerance,
"radiusTolerance": float(radius_tolerance),
"platforms": platforms,
- "parameter": parameter_s
+ "parameter": parameter_s,
}
if depth_min is not None:
@@ -256,6 +267,9 @@ class Matchup(SparkHandler):
if depth_max is not None:
args["depthMax"] = float(depth_max)
+ if quality_flag is not None:
+ args["qualityFlag"] = int(quality_flag)
+
total_keys = len(spark_result.keys())
total_values = sum(len(v) for v in spark_result.itervalues())
details = {
@@ -301,10 +315,14 @@ class Matchup(SparkHandler):
return {
"sea_water_temperature": domspoint.sst,
"sea_water_temperature_depth": domspoint.sst_depth,
+ "sea_water_temperature_quality": domspoint.sst_quality,
"sea_water_salinity": domspoint.sss,
"sea_water_salinity_depth": domspoint.sss_depth,
+ "sea_water_salinity_quality": domspoint.sss_quality,
"wind_speed": domspoint.wind_speed,
"wind_direction": domspoint.wind_direction,
+ "wind_speed_quality": domspoint.wind_speed_quality,
+ "wind_component_quality": domspoint.wind_component_quality,
"wind_u": domspoint.wind_u,
"wind_v": domspoint.wind_v,
"platform": doms_values.getPlatformById(domspoint.platform),
@@ -332,10 +350,14 @@ class DomsPoint(object):
self.wind_v = None
self.wind_direction = None
self.wind_speed = None
+ self.wind_speed_quality = None
+ self.wind_component_quality = None
self.sst = None
self.sst_depth = None
+ self.sst_quality = None
self.sss = None
self.sss_depth = None
+ self.sss_quality = None
self.source = None
self.depth = None
self.platform = None
@@ -414,10 +436,14 @@ class DomsPoint(object):
point.wind_v = edge_point.get('northward_wind')
point.wind_direction = edge_point.get('wind_direction')
point.wind_speed = edge_point.get('wind_speed')
+ point.wind_speed_quality = edge_point.get('wind_speed_quality')
+ point.wind_component_quality = edge_point.get('wind_component_quality')
point.sst = edge_point.get('sea_water_temperature')
point.sst_depth = edge_point.get('sea_water_temperature_depth')
+ point.sst_quality = edge_point.get('sea_water_temperature_quality')
point.sss = edge_point.get('sea_water_salinity')
point.sss_depth = edge_point.get('sea_water_salinity_depth')
+ point.sss_quality = edge_point.get('sea_water_salinity_quality')
point.source = edge_point.get('source')
point.platform = edge_point.get('platform')
point.device = edge_point.get('device')
@@ -437,6 +463,7 @@ DRIVER_LOCK = Lock()
def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
+ quality_flag,
time_tolerance, radius_tolerance, platforms, match_once, sc=None):
from functools import partial
@@ -446,6 +473,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_nam
matchup_b = sc.broadcast(matchup_ds_names)
depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
+ quality_flag_b = sc.broadcast(int(quality_flag) if quality_flag is not None else None)
tt_b = sc.broadcast(time_tolerance)
rt_b = sc.broadcast(float(radius_tolerance))
platforms_b = sc.broadcast(platforms)
@@ -459,7 +487,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_nam
rdd_filtered = rdd.mapPartitions(
partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
- depth_max_b=depth_max_b), preservesPartitioning=True) \
+ depth_max_b=depth_max_b, quality_flag_b=quality_flag_b), preservesPartitioning=True) \
.filter(lambda p_m_tuple: abs(
iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
@@ -527,7 +555,7 @@ def add_meters_to_lon_lat(lon, lat, meters):
def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
- bounding_wkt_b, depth_min_b, depth_max_b):
+ bounding_wkt_b, depth_min_b, depth_max_b, quality_flag_b):
the_time = datetime.now()
tile_ids = list(tile_ids)
if len(tile_ids) == 0:
@@ -571,7 +599,8 @@ def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b,
bbox = ','.join(
[str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
- platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
+ platforms_b.value, depth_min_b.value, depth_max_b.value, quality_flag_b.value,
+ session=edge_session)
if edge_response['totalResults'] == 0:
continue
r = edge_response['results']
@@ -655,7 +684,8 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
yield p_doms_point, m_doms_point
-def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, quality_flag,
+ itemsPerPage=1000,
startIndex=0, stats=True, session=None):
try:
startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
@@ -682,6 +712,7 @@ def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min,
"minDepth": depth_min,
"maxDepth": depth_max,
"platform": platform,
+ "qualityFlag": quality_flag,
"itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
if session is not None: