You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/03/30 21:46:13 UTC

[incubator-sdap-nexus] 01/01: Added new query parameter to matchup algorithm

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-454
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 51421a6a47881b07946ecbb041a0c1ce0e9af891
Author: skorper <st...@gmail.com>
AuthorDate: Thu Mar 30 14:45:42 2023 -0700

    Added new query parameter  to matchup algorithm
---
 CHANGELOG.md                                    |  1 +
 analysis/tests/algorithms_spark/test_matchup.py | 68 +++++++++++++++++++++++++
 analysis/webservice/algorithms_spark/Matchup.py | 31 ++++++++---
 analysis/webservice/apidocs/openapi.yml         | 13 +++++
 4 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b03fda5..d76168f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ## [Unreleased]
 ### Added
 - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them.
+- SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm
 ### Changed
 - SDAP-443:
   - Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/tests/algorithms_spark/test_matchup.py b/analysis/tests/algorithms_spark/test_matchup.py
index eba1e5f..47d00c3 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -589,3 +589,71 @@ def test_match_once_keep_duplicates():
         for point in secondary_points:
             assert point.data_id in [secondary_doms_point_1.data_id, secondary_doms_point_2.data_id]
             assert point.data_id != secondary_doms_point_3.data_id
+
+
+def test_prioritize_distance():
+    """
+    Ensure that distance is prioritized over time when prioritizeDistance=True, and
+    that time is prioritized over distance when prioritizeDistance=False.
+    """
+    primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0, time='2017-07-01T00:00:00Z',
+                                   depth=None, data_id='primary')
+    # Close in space, far in time
+    secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0, time='2017-07-08T00:00:00Z',
+                                       depth=-1, data_id='secondary1')
+    # Far in space, close in time
+    secondary_doms_point_2 = DomsPoint(longitude=90.0, latitude=90.0, time='2017-07-01T00:00:01Z',
+                                       depth=-1, data_id='secondary2')
+
+    primary_doms_point.data = []
+    secondary_doms_point_1.data = []
+    secondary_doms_point_2.data = []
+
+    patch_generators = [
+        (primary_doms_point, secondary_doms_point_1),
+        (primary_doms_point, secondary_doms_point_2),
+    ]
+
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+
+    with mock.patch(
+            'webservice.algorithms_spark.Matchup.match_satellite_to_insitu',
+    ) as mock_match_satellite_to_insitu, mock.patch(
+        'webservice.algorithms_spark.Matchup.determine_parallelism'
+    ) as mock_determine_parallelism:
+        # Mock the actual call to generate a matchup. Hardcode response
+        # to test this scenario
+        mock_match_satellite_to_insitu.return_value = patch_generators
+        mock_determine_parallelism.return_value = 1
+
+        match_params = {
+            'tile_ids': ['test'],
+            'bounding_wkt': '',
+            'primary_ds_name': '',
+            'secondary_ds_names': '',
+            'parameter': '',
+            'depth_min': 0,
+            'depth_max': 0,
+            'time_tolerance': 2000000,
+            'radius_tolerance': 0,
+            'platforms': '',
+            'match_once': True,
+            'tile_service_factory': lambda x: None,
+            'prioritize_distance': True,
+            'sc': spark_context
+        }
+
+        match_result = spark_matchup_driver(**match_params)
+        assert len(match_result) == 1
+        secondary_points = match_result[list(match_result.keys())[0]]
+        assert len(secondary_points) == 1
+        assert secondary_points[0].data_id == secondary_doms_point_1.data_id
+
+        match_params['prioritize_distance'] = False
+
+        match_result = spark_matchup_driver(**match_params)
+        assert len(match_result) == 1
+        secondary_points = match_result[list(match_result.keys())[0]]
+        assert len(secondary_points) == 1
+        assert secondary_points[0].data_id == secondary_doms_point_2.data_id
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 1274b64..25251cd 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -142,6 +142,13 @@ class Matchup(NexusCalcSparkHandler):
                            "If the number of primary matches is greater than this limit, the service will respond with "
                            "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
                            "Default: 500"
+        },
+        "prioritizeDistance": {
+            "name": "Prioritize distance",
+            "type": "boolean",
+            "description": "If true, prioritize distance over time when computing matches. If false, prioritize time over "
+                           "distance. This is only relevant if matchOnce=true, because otherwise all matches will be "
+                           "included so long as they fit within the user-provided tolerances. Default is true."
         }
     }
     singleton = True
@@ -214,10 +221,13 @@ class Matchup(NexusCalcSparkHandler):
         start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
 
+        prioritize_distance = request.get_boolean_arg("prioritizeDistance", default=True)
+
+
         return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
                start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
                depth_min, depth_max, time_tolerance, radius_tolerance, \
-               platforms, match_once, result_size_limit
+               platforms, match_once, result_size_limit, prioritize_distance
 
     def calc(self, request, **args):
         start = datetime.utcnow()
@@ -225,7 +235,7 @@ class Matchup(NexusCalcSparkHandler):
         bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
         start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
         depth_min, depth_max, time_tolerance, radius_tolerance, \
-        platforms, match_once, result_size_limit = self.parse_arguments(request)
+        platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
 
         with ResultsStorage(self.config) as resultsStorage:
 
@@ -246,7 +256,8 @@ class Matchup(NexusCalcSparkHandler):
         try:
             spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
                                                 secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
-                                                radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
+                                                radius_tolerance, platforms, match_once, self.tile_service_factory,
+                                                prioritize_distance, sc=self._sc)
         except Exception as e:
             self.log.exception(e)
             raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
@@ -564,7 +575,7 @@ DRIVER_LOCK = Lock()
 
 
 def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max,
-                         time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, sc=None):
+                         time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None):
     from functools import partial
 
     with DRIVER_LOCK:
@@ -611,12 +622,14 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
         # Method used for calculating the distance between 2 DomsPoints
         from pyproj import Geod
 
-        def dist(primary, matchup):
+        def dist(primary, matchup, prioritize_distance):
             wgs84_geod = Geod(ellps='WGS84')
             lat1, lon1 = (primary.latitude, primary.longitude)
             lat2, lon2 = (matchup.latitude, matchup.longitude)
             az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
-            return distance, time_dist(primary, matchup)
+            if prioritize_distance:
+                return distance, time_dist(primary, matchup)
+            return time_dist(primary, matchup), distance
 
         def time_dist(primary, matchup):
             primary_time = iso_time_to_epoch(primary.time)
@@ -645,7 +658,11 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
 
         rdd_filtered = rdd_filtered.map(
             lambda primary_matchup: tuple(
-                [primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])]
+                [primary_matchup[0], tuple([primary_matchup[1], dist(
+                    primary_matchup[0],
+                    primary_matchup[1],
+                    prioritize_distance
+                )])]
             )).combineByKey(
                 lambda value: [value],
                 lambda value_list, value: value_list + [value],
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 55802cd..206278e 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -167,6 +167,19 @@ paths:
             type: integer
             default: 500
           example: 500
+        - in: query
+          name: prioritizeDistance
+          description: |
+            If true, prioritize distance over time when computing matches. 
+            If false, prioritize time over distance. This is only relevant if 
+            matchOnce=true, because otherwise all matches will be included 
+            so long as they fit within the user-provided tolerances. 
+            Default is true.
+          required: false
+          schema:
+            type: boolean
+            default: true
+          example: true
       responses:
         '200':
           description: Successful operation