You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/03/30 21:46:12 UTC

[incubator-sdap-nexus] branch SDAP-454 created (now 51421a6)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a change to branch SDAP-454
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


      at 51421a6  Added new query parameter  to matchup algorithm

This branch includes the following new commits:

     new 51421a6  Added new query parameter  to matchup algorithm

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sdap-nexus] 01/01: Added new query parameter to matchup algorithm

Posted by sk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-454
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 51421a6a47881b07946ecbb041a0c1ce0e9af891
Author: skorper <st...@gmail.com>
AuthorDate: Thu Mar 30 14:45:42 2023 -0700

    Added new query parameter  to matchup algorithm
---
 CHANGELOG.md                                    |  1 +
 analysis/tests/algorithms_spark/test_matchup.py | 68 +++++++++++++++++++++++++
 analysis/webservice/algorithms_spark/Matchup.py | 31 ++++++++---
 analysis/webservice/apidocs/openapi.yml         | 13 +++++
 4 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b03fda5..d76168f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ## [Unreleased]
 ### Added
 - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them.
+- SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm
 ### Changed
 - SDAP-443:
   - Replacing DOMS terminology with CDMS terminology:
diff --git a/analysis/tests/algorithms_spark/test_matchup.py b/analysis/tests/algorithms_spark/test_matchup.py
index eba1e5f..47d00c3 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -589,3 +589,71 @@ def test_match_once_keep_duplicates():
         for point in secondary_points:
             assert point.data_id in [secondary_doms_point_1.data_id, secondary_doms_point_2.data_id]
             assert point.data_id != secondary_doms_point_3.data_id
+
+
+def test_prioritize_distance():
+    """
+    Ensure that distance is prioritized over time when prioritizeDistance=True, and
+    that time is prioritized over distance when prioritizeDistance=False.
+    """
+    primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0, time='2017-07-01T00:00:00Z',
+                                   depth=None, data_id='primary')
+    # Close in space, far in time
+    secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0, time='2017-07-08T00:00:00Z',
+                                       depth=-1, data_id='secondary1')
+    # Far in space, close in time
+    secondary_doms_point_2 = DomsPoint(longitude=90.0, latitude=90.0, time='2017-07-01T00:00:01Z',
+                                       depth=-1, data_id='secondary2')
+
+    primary_doms_point.data = []
+    secondary_doms_point_1.data = []
+    secondary_doms_point_2.data = []
+
+    patch_generators = [
+        (primary_doms_point, secondary_doms_point_1),
+        (primary_doms_point, secondary_doms_point_2),
+    ]
+
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+
+    with mock.patch(
+            'webservice.algorithms_spark.Matchup.match_satellite_to_insitu',
+    ) as mock_match_satellite_to_insitu, mock.patch(
+        'webservice.algorithms_spark.Matchup.determine_parallelism'
+    ) as mock_determine_parallelism:
+        # Mock the actual call to generate a matchup. Hardcode response
+        # to test this scenario
+        mock_match_satellite_to_insitu.return_value = patch_generators
+        mock_determine_parallelism.return_value = 1
+
+        match_params = {
+            'tile_ids': ['test'],
+            'bounding_wkt': '',
+            'primary_ds_name': '',
+            'secondary_ds_names': '',
+            'parameter': '',
+            'depth_min': 0,
+            'depth_max': 0,
+            'time_tolerance': 2000000,
+            'radius_tolerance': 0,
+            'platforms': '',
+            'match_once': True,
+            'tile_service_factory': lambda x: None,
+            'prioritize_distance': True,
+            'sc': spark_context
+        }
+
+        match_result = spark_matchup_driver(**match_params)
+        assert len(match_result) == 1
+        secondary_points = match_result[list(match_result.keys())[0]]
+        assert len(secondary_points) == 1
+        assert secondary_points[0].data_id == secondary_doms_point_1.data_id
+
+        match_params['prioritize_distance'] = False
+
+        match_result = spark_matchup_driver(**match_params)
+        assert len(match_result) == 1
+        secondary_points = match_result[list(match_result.keys())[0]]
+        assert len(secondary_points) == 1
+        assert secondary_points[0].data_id == secondary_doms_point_2.data_id
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 1274b64..25251cd 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -142,6 +142,13 @@ class Matchup(NexusCalcSparkHandler):
                            "If the number of primary matches is greater than this limit, the service will respond with "
                            "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
                            "Default: 500"
+        },
+        "prioritizeDistance": {
+            "name": "Prioritize distance",
+            "type": "boolean",
+            "description": "If true, prioritize distance over time when computing matches. If false, prioritize time over "
+                           "distance. This is only relevant if matchOnce=true, because otherwise all matches will be "
+                           "included so long as they fit within the user-provided tolerances. Default is true."
         }
     }
     singleton = True
@@ -214,10 +221,13 @@ class Matchup(NexusCalcSparkHandler):
         start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
 
+        prioritize_distance = request.get_boolean_arg("prioritizeDistance", default=True)
+
+
         return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
                start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
                depth_min, depth_max, time_tolerance, radius_tolerance, \
-               platforms, match_once, result_size_limit
+               platforms, match_once, result_size_limit, prioritize_distance
 
     def calc(self, request, **args):
         start = datetime.utcnow()
@@ -225,7 +235,7 @@ class Matchup(NexusCalcSparkHandler):
         bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
         start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
         depth_min, depth_max, time_tolerance, radius_tolerance, \
-        platforms, match_once, result_size_limit = self.parse_arguments(request)
+        platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
 
         with ResultsStorage(self.config) as resultsStorage:
 
@@ -246,7 +256,8 @@ class Matchup(NexusCalcSparkHandler):
         try:
             spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
                                                 secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
-                                                radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc)
+                                                radius_tolerance, platforms, match_once, self.tile_service_factory,
+                                                prioritize_distance, sc=self._sc)
         except Exception as e:
             self.log.exception(e)
             raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
@@ -564,7 +575,7 @@ DRIVER_LOCK = Lock()
 
 
 def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max,
-                         time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, sc=None):
+                         time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None):
     from functools import partial
 
     with DRIVER_LOCK:
@@ -611,12 +622,14 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
         # Method used for calculating the distance between 2 DomsPoints
         from pyproj import Geod
 
-        def dist(primary, matchup):
+        def dist(primary, matchup, prioritize_distance):
             wgs84_geod = Geod(ellps='WGS84')
             lat1, lon1 = (primary.latitude, primary.longitude)
             lat2, lon2 = (matchup.latitude, matchup.longitude)
             az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
-            return distance, time_dist(primary, matchup)
+            if prioritize_distance:
+                return distance, time_dist(primary, matchup)
+            return time_dist(primary, matchup), distance
 
         def time_dist(primary, matchup):
             primary_time = iso_time_to_epoch(primary.time)
@@ -645,7 +658,11 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
 
         rdd_filtered = rdd_filtered.map(
             lambda primary_matchup: tuple(
-                [primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])]
+                [primary_matchup[0], tuple([primary_matchup[1], dist(
+                    primary_matchup[0],
+                    primary_matchup[1],
+                    prioritize_distance
+                )])]
             )).combineByKey(
                 lambda value: [value],
                 lambda value_list, value: value_list + [value],
diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml
index 55802cd..206278e 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -167,6 +167,19 @@ paths:
             type: integer
             default: 500
           example: 500
+        - in: query
+          name: prioritizeDistance
+          description: |
+            If true, prioritize distance over time when computing matches. 
+            If false, prioritize time over distance. This is only relevant if 
+            matchOnce=true, because otherwise all matches will be included 
+            so long as they fit within the user-provided tolerances. 
+            Default is true.
+          required: false
+          schema:
+            type: boolean
+            default: true
+          example: true
       responses:
         '200':
           description: Successful operation