You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/03/30 21:46:13 UTC
[incubator-sdap-nexus] 01/01: Added new query parameter to matchup algorithm
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch SDAP-454
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 51421a6a47881b07946ecbb041a0c1ce0e9af891
Author: skorper <st...@gmail.com>
AuthorDate: Thu Mar 30 14:45:42 2023 -0700
Added new query parameter to matchup algorithm
---
CHANGELOG.md | 1 +
analysis/tests/algorithms_spark/test_matchup.py | 68 +++++++++++++++++++++++++
analysis/webservice/algorithms_spark/Matchup.py | 31 ++++++++---
analysis/webservice/apidocs/openapi.yml | 13 +++++
4 files changed, 106 insertions(+), 7 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b03fda5..d76168f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them.
+- SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm
### Changed
- SDAP-443:
- Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/tests/algorithms_spark/test_matchup.py b/analysis/tests/algorithms_spark/test_matchup.py
index eba1e5f..47d00c3 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -589,3 +589,71 @@ def test_match_once_keep_duplicates():
for point in secondary_points:
assert point.data_id in [secondary_doms_point_1.data_id, secondary_doms_point_2.data_id]
assert point.data_id != secondary_doms_point_3.data_id
+
+
+def test_prioritize_distance():
+ """
+ Ensure that distance is prioritized over time when prioritizeDistance=True, and
+ that time is prioritized over distance when prioritizeDistance=False.
+ """
+ primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0, time='2017-07-01T00:00:00Z',
+ depth=None, data_id='primary')
+ # Close in space, far in time
+ secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0, time='2017-07-08T00:00:00Z',
+ depth=-1, data_id='secondary1')
+ # Far in space, close in time
+ secondary_doms_point_2 = DomsPoint(longitude=90.0, latitude=90.0, time='2017-07-01T00:00:01Z',
+ depth=-1, data_id='secondary2')
+
+ primary_doms_point.data = []
+ secondary_doms_point_1.data = []
+ secondary_doms_point_2.data = []
+
+ patch_generators = [
+ (primary_doms_point, secondary_doms_point_1),
+ (primary_doms_point, secondary_doms_point_2),
+ ]
+
+ spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+ spark_context = spark.sparkContext
+
+ with mock.patch(
+ 'webservice.algorithms_spark.Matchup.match_satellite_to_insitu',
+ ) as mock_match_satellite_to_insitu, mock.patch(
+ 'webservice.algorithms_spark.Matchup.determine_parallelism'
+ ) as mock_determine_parallelism:
+ # Mock the actual call to generate a matchup. Hardcode response
+ # to test this scenario
+ mock_match_satellite_to_insitu.return_value = patch_generators
+ mock_determine_parallelism.return_value = 1
+
+ match_params = {
+ 'tile_ids': ['test'],
+ 'bounding_wkt': '',
+ 'primary_ds_name': '',
+ 'secondary_ds_names': '',
+ 'parameter': '',
+ 'depth_min': 0,
+ 'depth_max': 0,
+ 'time_tolerance': 2000000,
+ 'radius_tolerance': 0,
+ 'platforms': '',
+ 'match_once': True,
+ 'tile_service_factory': lambda x: None,
+ 'prioritize_distance': True,
+ 'sc': spark_context
+ }
+
+ match_result = spark_matchup_driver(**match_params)
+ assert len(match_result) == 1
+ secondary_points = match_result[list(match_result.keys())[0]]
+ assert len(secondary_points) == 1
+ assert secondary_points[0].data_id == secondary_doms_point_1.data_id
+
+ match_params['prioritize_distance'] = False
+
+ match_result = spark_matchup_driver(**match_params)
+ assert len(match_result) == 1
+ secondary_points = match_result[list(match_result.keys())[0]]
+ assert len(secondary_points) == 1
+ assert secondary_points[0].data_id == secondary_doms_point_2.data_id
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 1274b64..25251cd 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -142,6 +142,13 @@ class Matchup(NexusCalcSparkHandler):
"If the number of primary matches is greater than this limit, the service will respond with "
"(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
"Default: 500"
+ },
+ "prioritizeDistance": {
+ "name": "Prioritize distance",
+ "type": "boolean",
+ "description": "If true, prioritize distance over time when computing matches. If false, prioritize time over "
+ "distance. This is only relevant if matchOnce=true, because otherwise all matches will be "
+ "included so long as they fit within the user-provided tolerances. Default is true."
}
}
singleton = True
@@ -214,10 +221,13 @@ class Matchup(NexusCalcSparkHandler):
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
+ prioritize_distance = request.get_boolean_arg("prioritizeDistance", default=True)
+
+
return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
depth_min, depth_max, time_tolerance, radius_tolerance, \
- platforms, match_once, result_size_limit
+ platforms, match_once, result_size_limit, prioritize_distance
def calc(self, request, **args):
start = datetime.utcnow()
@@ -225,7 +235,7 @@ class Matchup(NexusCalcSparkHandler):
bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
depth_min, depth_max, time_tolerance, radius_tolerance, \
- platforms, match_once, result_size_limit = self.parse_arguments(request)
+ platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
with ResultsStorage(self.config) as resultsStorage:
@@ -246,7 +256,8 @@ class Matchup(NexusCalcSparkHandler):
try:
spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
- radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
+ radius_tolerance, platforms, match_once, self.tile_service_factory,
+ prioritize_distance, sc=self._sc)
except Exception as e:
self.log.exception(e)
raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
@@ -564,7 +575,7 @@ DRIVER_LOCK = Lock()
def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max,
- time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, sc=None):
+ time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None):
from functools import partial
with DRIVER_LOCK:
@@ -611,12 +622,14 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
# Method used for calculating the distance between 2 DomsPoints
from pyproj import Geod
- def dist(primary, matchup):
+ def dist(primary, matchup, prioritize_distance):
wgs84_geod = Geod(ellps='WGS84')
lat1, lon1 = (primary.latitude, primary.longitude)
lat2, lon2 = (matchup.latitude, matchup.longitude)
az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
- return distance, time_dist(primary, matchup)
+ if prioritize_distance:
+ return distance, time_dist(primary, matchup)
+ return time_dist(primary, matchup), distance
def time_dist(primary, matchup):
primary_time = iso_time_to_epoch(primary.time)
@@ -645,7 +658,11 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
rdd_filtered = rdd_filtered.map(
lambda primary_matchup: tuple(
- [primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])]
+ [primary_matchup[0], tuple([primary_matchup[1], dist(
+ primary_matchup[0],
+ primary_matchup[1],
+ prioritize_distance
+ )])]
)).combineByKey(
lambda value: [value],
lambda value_list, value: value_list + [value],
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 55802cd..206278e 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -167,6 +167,19 @@ paths:
type: integer
default: 500
example: 500
+ - in: query
+ name: prioritizeDistance
+ description: |
+ If true, prioritize distance over time when computing matches.
+ If false, prioritize time over distance. This is only relevant if
+ matchOnce=true, because otherwise all matches will be included
+ so long as they fit within the user-provided tolerances.
+ Default is true.
+ required: false
+ schema:
+ type: boolean
+ default: true
+ example: true
responses:
'200':
description: Successful operation