You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2022/09/28 00:08:37 UTC

[incubator-sdap-nexus] branch feature/SDAP-402 updated (3194930 -> c6fd041)

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a change to branch feature/SDAP-402
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


    from 3194930  test data update to fix matchup test
     new 6f6e102  Updated matchup matchOnce logic
     new c6fd041  Added new test case for matchOnce fix

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 analysis/tests/algorithms_spark/test_matchup.py | 61 ++++++++++++++++++++++++-
 analysis/webservice/algorithms_spark/Matchup.py | 46 ++++++++++++++++---
 2 files changed, 99 insertions(+), 8 deletions(-)


[incubator-sdap-nexus] 01/02: Updated matchup matchOnce logic

Posted by sk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch feature/SDAP-402
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 6f6e102dc18d92037c52fb766853094ed8260bf4
Author: skorper <st...@gmail.com>
AuthorDate: Tue Sep 27 17:05:45 2022 -0700

    Updated matchup matchOnce logic
---
 analysis/webservice/algorithms_spark/Matchup.py | 46 +++++++++++++++++++++----
 1 file changed, 39 insertions(+), 7 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 5dafb31..4bd33cf 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -349,7 +349,6 @@ class DomsPoint(object):
         self.data = None
 
         self.source = None
-        self.depth = None
         self.platform = None
         self.device = None
         self.file_url = None
@@ -558,7 +557,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
         parameter_b = sc.broadcast(parameter)
 
         # Parallelize list of tile ids
-        rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+        rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))
 
     # Map Partitions ( list(tile_id) )
     rdd_filtered = rdd.mapPartitions(
@@ -601,10 +600,43 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
             matchup_time = iso_time_to_epoch(matchup.time)
             return abs(primary_time - matchup_time)
 
-        rdd_filtered = rdd_filtered \
-            .map(lambda primary_matchup: tuple([primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])])) \
-            .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \
-            .mapValues(lambda x: [x[0]])
+        def filter_closest(matches):
+            """
+            Filter given matches. Find the closest match to the primary
+            point and only keep other matches that match the same
+            time/space as that point.
+
+            :param matches: List of match tuples. Each tuple has the following format:
+                1. The secondary match
+                2. Tuple of form (space_dist, time_dist)
+            """
+            closest_point = min(matches, key=lambda match: match[1])[0]
+            matches = list(filter(
+                lambda match: match.latitude == closest_point.latitude and
+                              match.longitude == closest_point.longitude and
+                              match.time == closest_point.time, map(
+                    lambda match: match[0], matches
+                )
+            ))
+            return matches
+
+        rez = rdd_filtered.collect()
+
+        print(f'match_debugging: {type(rez)}')
+        print(f'match_debugging: {len(rez)}')
+        print(f'match_debugging: {type(rez[-1])}')
+        print(f'match_debugging: {len(rez[-1])}')
+        print(f'match_debugging: {type(rez[0][0])}')
+        print(f'match_debugging: {type(rez[0][1])}')
+
+        rdd_filtered = rdd_filtered.map(
+            lambda primary_matchup: tuple(
+                [primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])]
+            )).combineByKey(
+                lambda value: [value],
+                lambda value_list, value: value_list + [value],
+                lambda value_list_a, value_list_b: value_list_a + value_list_b
+            ).mapValues(lambda matches: filter_closest(matches))
     else:
         rdd_filtered = rdd_filtered \
             .combineByKey(lambda value: [value],  # Create 1 element list
@@ -616,7 +648,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
     return result_as_map
 
 
-def determine_parllelism(num_tiles):
+def determine_parallelism(num_tiles):
     """
     Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
     Also, don't go below the default of 8


[incubator-sdap-nexus] 02/02: Added new test case for matchOnce fix

Posted by sk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch feature/SDAP-402
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit c6fd041b78710fe098f6cf5f9619e1faa73eecb9
Author: skorper <st...@gmail.com>
AuthorDate: Tue Sep 27 17:08:07 2022 -0700

    Added new test case for matchOnce fix
---
 analysis/tests/algorithms_spark/test_matchup.py | 61 ++++++++++++++++++++++++-
 1 file changed, 60 insertions(+), 1 deletion(-)

diff --git a/analysis/tests/algorithms_spark/test_matchup.py b/analysis/tests/algorithms_spark/test_matchup.py
index 318655c..eba1e5f 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -27,7 +27,7 @@ from nexustiles.model.nexusmodel import Tile, TileVariable
 from pyspark.sql import SparkSession
 from shapely import wkt
 from shapely.geometry import box
-from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint
+from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint, spark_matchup_driver
 
 
 class MockSparkParam:
@@ -530,3 +530,62 @@ def test_multi_variable_satellite_to_satellite_matchup(test_dir, test_tile, test
         assert len(matchup_result[0][1].data) == 2
         assert len(matchup_result[1][0].data) == 2
         assert len(matchup_result[1][1].data) == 2
+
+
+def test_match_once_keep_duplicates():
+    """
+    Ensure duplicate points (in space and time) are maintained when
+    matchup is called with matchOnce=True. Multiple points with the
+    same space/time should be kept even if they have different
+    depth/devices
+    """
+    primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0, time='2017-07-01T00:00:00Z', depth=None, data_id = 'primary')
+    secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0, time='2017-07-02T00:00:00Z', depth=-2, data_id = 'secondary1')
+    secondary_doms_point_2 = DomsPoint(longitude=2.0, latitude=2.0, time='2017-07-02T00:00:00Z', depth=-3, data_id = 'secondary2')
+    secondary_doms_point_3 = DomsPoint(longitude=100.0, latitude=50.0, time='2017-07-05T00:00:00Z', depth=0, data_id = 'secondary3')
+
+    primary_doms_point.data = []
+    secondary_doms_point_1.data = []
+    secondary_doms_point_2.data = []
+    secondary_doms_point_3.data = []
+
+    patch_generators = [
+        (primary_doms_point, secondary_doms_point_3),
+        (primary_doms_point, secondary_doms_point_2),
+        (primary_doms_point, secondary_doms_point_1)
+    ]
+
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+
+    with mock.patch(
+        'webservice.algorithms_spark.Matchup.match_satellite_to_insitu',
+    ) as mock_match_satellite_to_insitu, mock.patch(
+        'webservice.algorithms_spark.Matchup.determine_parallelism'
+    ) as mock_determine_parallelism:
+        # Mock the actual call to generate a matchup. Hardcode response
+        # to test this scenario
+        mock_match_satellite_to_insitu.return_value = patch_generators
+        mock_determine_parallelism.return_value = 1
+
+        match_result = spark_matchup_driver(
+            tile_ids=['test'],
+            bounding_wkt='',
+            primary_ds_name='',
+            secondary_ds_names='',
+            parameter='',
+            depth_min=0,
+            depth_max=0,
+            time_tolerance=2000000,
+            radius_tolerance=0,
+            platforms='',
+            match_once=True,
+            tile_service_factory=lambda x: None,
+            sc=spark_context
+        )
+        assert len(match_result) == 1
+        secondary_points = match_result[list(match_result.keys())[0]]
+        assert len(secondary_points) == 2
+        for point in secondary_points:
+            assert point.data_id in [secondary_doms_point_1.data_id, secondary_doms_point_2.data_id]
+            assert point.data_id != secondary_doms_point_3.data_id