You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2022/12/07 23:11:42 UTC

[incubator-sdap-nexus] branch zarr-testing updated: CDMS-122 - Limited Zarr Proxy (#174)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch zarr-testing
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/zarr-testing by this push:
     new 6fd614e  CDMS-122 - Limited Zarr Proxy (#174)
6fd614e is described below

commit 6fd614e37f861c5b308d82cc9eebe352fc29ee46
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Wed Dec 7 15:11:37 2022 -0800

    CDMS-122 - Limited Zarr Proxy (#174)
    
    * Created ZarrProxy class
    
    Also configured NexusTileService to use ZarrProxy if specified in config file
    
    * Empty source for zarr testing
    
    * More imports
    
    May not all be used yet
    
    * Updated zarrproxy
    
    Brought up to internally developed state. Should be capable of pulling from S3 & the framework for tile translation is present.
    
    * missed something in last commit
    
    * Mostly implemented Zarr -> Nexus tile translation
    
    Still to do: Determine index in time array from time in tile id
    
    * minor error
    
    * Implemented method to select time for tile data
    
    Will select time in the zarr dataset that is nearest to the time specified in tid
    
    * Minor fix
    
    Incorrect regex for timestamp
    
    * CDMS-122 Fixed tile translation
    
    Working with proof of concept.
    
    Still to do: Work out metadata in Nexus tiles. For now, I'm just passing the Dataset attributes dictionary.
    
    * CDMS-122 Migrated working ZarrProxy from PoC repo
    
    Currently only works with MUR sst data, but it is ready for integrated SDAP testing.
    
    * CDMS-122 Cleaned up last commit
    
    Removed unused vars, replaced license header & misc cleanup
    
    * CDMS-122 Begin integrating ZarrProxy functionality
    
    -NTS helper functions
    -Some NCSH functions can handle ZarrProxy
    -Integrating Zarr into /matchup_spark (Not yet functional)
    
    * CDMS-122 'Working' integration with /matchup_spark
    
    Returns same matches as using existing data in Cassandra; however, the returned data disagrees slightly (different variable values &c)
    
    * CDMS-122 Include more metadata in tile output
    
    Attempting to sync the output to Cassandra-backed output. All that is left now would be the missing data vars but that would require handling multi-var tiles
    
    * CDMS-122: Preemptively updating the changelog
    
    So I don't forget come PR time
    
    * CDMS-122: BENCHMARKING: Mock IS API Response
    
    * start of test
    
    * Retain mock IS query
    
    * Current test code.
    
    Currently failing due to issues with moto & s3fs
    
    * CDMS-122 First working test case
    
    And start for matchup test
    
    * CDMS-122 First working test case
    
    And start for matchup test
    
    * Fix
    
    * CDMS-122 Matchup test case
    
    Currently failing
    
    * CDMS-122 CDMS-122 Functional unit tests
    
    2 passing: ZarrProxy w/ 95% coverage
    
    * CDMS-122 Configurable dataset
    
    Untested
    
    * CDMS-122 Ensure solr is using nexusdatasets
    
    * Scope for test s3 fixture should save test time
    
    * Gitignore
    
    * Prep work to update ZarrProxy unit tests
    
    * ZarrProxy now passing all unit tests
    
    Coverage: 94%
    
    Additional test case added that does nothing but set up mock s3. Just to add a little extra convenience when watching the test output, so it can be seen when the proper tests have started running.
    
    * Confirmed changes function in local SDAP
    
    Currently mocking out solr query for dataset metadata on dataset open in ZarrProxy. Will remove this once the solr data has been updated
    
    Updated unit testing to reflect these changes (updates would cause unit tests to point to real AWS instead of moto AWS mock server)
    
    * Fix unit tests
    
    Recent PR requires additional IS mock
    
    * Minor changes
    
    Changed some raised exceptions to Nexus-specific exception types
    
    Also, changed the default tag for dynamic tile ids
    
    * Working timeseries implementation w/ zarr
    
    Not thoroughly tested
    
    * Reorged tile fetching for split fetch
    
    * Cleaned up logging for ZarrProxy
    
    * Updated ds metadata mocks
    
    Co-authored-by: Riley K Kuttruff <ri...@mt-110978.jpl.nasa.gov>
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 .gitignore                                         |   3 +
 CHANGELOG.md                                       |   3 +
 analysis/webservice/algorithms/TimeSeries.py       |   5 +-
 analysis/webservice/algorithms_spark/Matchup.py    | 281 +++++++++++++--
 .../algorithms_spark/NexusCalcSparkHandler.py      |  13 +-
 .../webservice/algorithms_spark/TimeSeriesSpark.py |  19 +-
 data-access/nexustiles/dao/ZarrProxy.py            | 313 ++++++++++++++++
 data-access/nexustiles/nexustiles.py               | 122 ++++++-
 data-access/tests/data/mock_insitu_schema.json     |   1 +
 data-access/tests/data/mock_response_direct.json   |   1 +
 data-access/tests/data/mock_response_indirect.json |   1 +
 data-access/tests/mock_chla_meta.json              |  38 ++
 data-access/tests/mock_mur_meta.json               |  29 ++
 data-access/tests/mock_oisss_meta.json             |  29 ++
 data-access/tests/mock_smap_meta.json              |  29 ++
 data-access/tests/test_zarr.py                     | 398 +++++++++++++++++++++
 16 files changed, 1244 insertions(+), 41 deletions(-)

diff --git a/.gitignore b/.gitignore
index 12ab2d6..84ac529 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,6 @@
 analysis/webservice/algorithms/doms/domsconfig.ini
 data-access/nexustiles/config/datastores.ini
 venv/
+
+data-access/tests/data/zarr_test_data/*
+
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2256ba7..ed14507 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -25,6 +25,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - SDAP-407: Added depth to `/domsresults` endpoint
 - Added documentation for building SDAP docker images
   - Prepared documentation for v1.0.0 release.
+- CDMS-122: Added new `ZarrProxy` to access Zarr-formatted data stored in AWS S3
+  - Currently only supports single variable & gridded data and the `/matchup_spark` endpoint
+  - Included unit tests
 ### Changed
 - SDAP-390: Changed `/doms` to `/cdms` and `doms_reader.py` to `cdms_reader.py`
 - domslist endpoint points to AWS insitu instead of doms insitu
diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py
index f96a2f9..a5e8617 100644
--- a/analysis/webservice/algorithms/TimeSeries.py
+++ b/analysis/webservice/algorithms/TimeSeries.py
@@ -27,7 +27,10 @@ import numpy as np
 import pytz
 import shapely.geometry
 import shapely.wkt
-from backports.functools_lru_cache import lru_cache
+try:
+    from functools import lru_cache
+except ImportError:
+    from backports.functools_lru_cache import lru_cache
 from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 from scipy import stats
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 5c5ec74..d9b8870 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -226,18 +226,26 @@ class Matchup(NexusCalcSparkHandler):
         depth_min, depth_max, time_tolerance, radius_tolerance, \
         platforms, match_once, result_size_limit = self.parse_arguments(request)
 
+        if self._get_tile_service().supports_direct_bounds_to_tile():
+            self._get_tile_service().get_datastore().open_dataset(primary_ds_name)
+
         with ResultsStorage(self.config) as resultsStorage:
 
             execution_id = str(resultsStorage.insertExecution(None, start, None, None))
 
         self.log.debug("Querying for tiles in search domain")
         # Get tile ids in box
-        tile_ids = [tile.tile_id for tile in
-                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
-                                                             start_seconds_from_epoch, end_seconds_from_epoch,
-                                                             fetch_data=False, fl='id',
-                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
-                                                                   'tile_min_lat asc'], rows=5000)]
+        if not self._get_tile_service().supports_direct_bounds_to_tile():
+            tile_ids = [tile.tile_id for tile in
+                        self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                                 start_seconds_from_epoch, end_seconds_from_epoch,
+                                                                 fetch_data=False, fl='id',
+                                                                 sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                       'tile_min_lat asc'], rows=5000)]
+        else:
+            bb = bounding_polygon.bounds
+
+            tile_ids = [self._get_tile_service().bounds_to_direct_tile_id(bb[1], bb[0], bb[3], bb[2], start_time, end_time)]
 
         self.log.info('Found %s tile_ids', len(tile_ids))
         # Call spark_matchup
@@ -426,7 +434,10 @@ class DomsPoint(object):
         point.longitude = nexus_point.longitude.item()
         point.latitude = nexus_point.latitude.item()
 
-        point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        if not isinstance(nexus_point.time, np.datetime64):
+            point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        else:
+            point.time = np.datetime_as_string(np.array([nexus_point.time]), unit='s', timezone='UTC')[0]
 
         try:
             point.depth = nexus_point.depth
@@ -737,9 +748,21 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
     tile_service = tile_service_factory()
 
     # Determine the spatial temporal extents of this partition of tiles
-    tiles_bbox = tile_service.get_bounding_box(tile_ids)
-    tiles_min_time = tile_service.get_min_time(tile_ids)
-    tiles_max_time = tile_service.get_max_time(tile_ids)
+    if not tile_service.supports_direct_bounds_to_tile():
+        tiles_bbox = tile_service.get_bounding_box(tile_ids)
+        tiles_min_time = tile_service.get_min_time(tile_ids)
+        tiles_max_time = tile_service.get_max_time(tile_ids)
+    else:
+        from dateutil import parser
+        from nexustiles.dao.ZarrProxy import ZarrProxy
+
+        parts = ZarrProxy.parse_tile_id_to_bounds(tile_ids[0])
+
+        tile_service.get_datastore().open_dataset(primary_b.value)
+
+        tiles_bbox = box(parts['min_lon'], parts['min_lat'], parts['max_lon'], parts['max_lat'])
+        tiles_min_time = int((parser.parse(parts['start_time']) - EPOCH).total_seconds())
+        tiles_max_time = int((parser.parse(parts['end_time']) - EPOCH).total_seconds())
 
     # Increase spatial extents by the radius tolerance
     matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
@@ -801,15 +824,24 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
         polygon = Polygon(
             [(west, south), (east, south), (east, north), (west, north), (west, south)])
 
-        matchup_tiles = tile_service.find_tiles_in_polygon(
-            bounding_polygon=polygon,
-            ds=secondary_b.value,
-            start_time=matchup_min_time,
-            end_time=matchup_max_time,
-            fetch_data=True,
-            sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'],
-            rows=5000
-        )
+        if not tile_service.supports_direct_bounds_to_tile():
+            matchup_tiles = tile_service.find_tiles_in_polygon(
+                bounding_polygon=polygon,
+                ds=secondary_b.value,
+                start_time=matchup_min_time,
+                end_time=matchup_max_time,
+                fetch_data=True,
+                sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'],
+                rows=5000
+            )
+        else:
+            matchup_tiles = tile_service.get_nexus_data_for_bounds(matchup_min_lat, matchup_min_lon,
+                                                                   matchup_max_lat, matchup_max_lon,
+                                                                   matchup_min_time, matchup_max_time)
+
+            tile_ids = [tile_service.bounds_to_direct_tile_id(matchup_min_lat, matchup_min_lon,
+                                                                   matchup_max_lat, matchup_max_lon,
+                                                                   matchup_min_time, matchup_max_time)]
 
         # Convert Tile IDS to tiles and convert to UTM lat/lon projection.
         matchup_points = []
@@ -848,12 +880,16 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
                                   search_parameter, radius_tolerance, aeqd_proj):
     from nexustiles.model.nexusmodel import NexusPoint
     from webservice.algorithms_spark.Matchup import DomsPoint  # Must import DomsPoint or Spark complains
+    from nexustiles.dao import ZarrProxy
 
     # Load tile
     try:
         the_time = datetime.now()
-        tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
-                                                  tile_service.find_tile_by_id(tile_id))[0]
+        if not tile_service.supports_direct_bounds_to_tile():
+            tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
+                                                      tile_service.find_tile_by_id(tile_id))[0]
+        else:
+            tile = tile_service.fetch_direct_by_id(tile_id)[0].as_model_tile()
         print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id))
     except IndexError:
         # This should only happen if all measurements in a tile become masked after applying the bounding polygon
@@ -863,6 +899,7 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
     # Convert valid tile lat,lon tuples to UTM tuples
     the_time = datetime.now()
     # Get list of indices of valid values
+
     valid_indices = tile.get_indices()
     primary_points = np.array(
         [aeqd_proj(tile.longitudes[aslice[2]], tile.latitudes[aslice[1]]) for
@@ -896,3 +933,205 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
             for m_point_index in point_matches:
                 m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
                 yield p_doms_point, m_doms_point
+
+
+def mock_query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+               startIndex=0, stats=True, session=None):
+    mock_response = True
+
+    if mock_response:
+        return{
+"total": 2,
+	"results": [
+		{
+			"depth": -99999.0,
+			"latitude": 29.87,
+			"longitude": -79.74,
+			"meta": "https://rda.ucar.edu/php/icoadsuid.php?uid=ORQ2YD",
+			"platform": {
+				"type": "7",
+				"code": "42",
+				"id": "13871"
+			},
+			"time": "2018-05-21T18:49:48Z",
+			"provider": "NCAR",
+			"project": "ICOADS Release 3.0",
+			"platform_code": "42",
+			"air_pressure": None,
+			"air_pressure_quality": None,
+			"air_temperature": None,
+			"air_temperature_quality": None,
+			"dew_point_temperature": None,
+			"dew_point_temperature_quality": None,
+			"downwelling_longwave_flux_in_air": None,
+			"downwelling_longwave_flux_in_air_quality": None,
+			"downwelling_longwave_radiance_in_air": None,
+			"downwelling_longwave_radiance_in_air_quality": None,
+			"downwelling_shortwave_flux_in_air": None,
+			"downwelling_shortwave_flux_in_air_quality": None,
+			"mass_concentration_of_chlorophyll_in_sea_water": None,
+			"mass_concentration_of_chlorophyll_in_sea_water_quality": None,
+			"rainfall_rate": None,
+			"rainfall_rate_quality": None,
+			"relative_humidity": None,
+			"relative_humidity_quality": None,
+			"sea_surface_salinity": None,
+			"sea_surface_salinity_quality": None,
+			"sea_surface_skin_temperature": None,
+			"sea_surface_skin_temperature_quality": None,
+			"sea_surface_subskin_temperature": None,
+			"sea_surface_subskin_temperature_quality": None,
+			"sea_surface_temperature": None,
+			"sea_surface_temperature_quality": None,
+			"sea_water_density": None,
+			"sea_water_density_quality": None,
+			"sea_water_electrical_conductivity": None,
+			"sea_water_electrical_conductivity_quality": None,
+			"sea_water_practical_salinity": None,
+			"sea_water_practical_salinity_quality": None,
+			"sea_water_salinity": None,
+			"sea_water_salinity_quality": None,
+			"sea_water_temperature": 26.1,
+			"sea_water_temperature_quality": 1,
+			"surface_downwelling_photosynthetic_photon_flux_in_air": None,
+			"surface_downwelling_photosynthetic_photon_flux_in_air_quality": None,
+			"wet_bulb_temperature": None,
+			"wet_bulb_temperature_quality": None,
+			"wind_speed": None,
+			"wind_speed_quality": None,
+			"wind_from_direction": None,
+			"wind_from_direction_quality": None,
+			"wind_to_direction": None,
+			"wind_to_direction_quality": None,
+			"eastward_wind": None,
+			"northward_wind": None,
+			"wind_component_quality": None,
+			"device": None,
+			"job_id": "84c9ca0c-207d-4e78-aa1d-4eeaec87d2a5"
+		},
+		{
+			"depth": 0.0,
+			"latitude": 29.87,
+			"longitude": -79.74,
+			"meta": "https://rda.ucar.edu/php/icoadsuid.php?uid=ORQ2YD",
+			"platform": {
+				"type": "7",
+				"code": "42",
+				"id": "13871"
+			},
+			"time": "2018-05-21T18:49:48Z",
+			"provider": "NCAR",
+			"project": "ICOADS Release 3.0",
+			"platform_code": "42",
+			"air_pressure": 1020.2,
+			"air_pressure_quality": 1,
+			"air_temperature": None,
+			"air_temperature_quality": None,
+			"dew_point_temperature": None,
+			"dew_point_temperature_quality": None,
+			"downwelling_longwave_flux_in_air": None,
+			"downwelling_longwave_flux_in_air_quality": None,
+			"downwelling_longwave_radiance_in_air": None,
+			"downwelling_longwave_radiance_in_air_quality": None,
+			"downwelling_shortwave_flux_in_air": None,
+			"downwelling_shortwave_flux_in_air_quality": None,
+			"mass_concentration_of_chlorophyll_in_sea_water": None,
+			"mass_concentration_of_chlorophyll_in_sea_water_quality": None,
+			"rainfall_rate": None,
+			"rainfall_rate_quality": None,
+			"relative_humidity": None,
+			"relative_humidity_quality": None,
+			"sea_surface_salinity": None,
+			"sea_surface_salinity_quality": None,
+			"sea_surface_skin_temperature": None,
+			"sea_surface_skin_temperature_quality": None,
+			"sea_surface_subskin_temperature": None,
+			"sea_surface_subskin_temperature_quality": None,
+			"sea_surface_temperature": None,
+			"sea_surface_temperature_quality": None,
+			"sea_water_density": None,
+			"sea_water_density_quality": None,
+			"sea_water_electrical_conductivity": None,
+			"sea_water_electrical_conductivity_quality": None,
+			"sea_water_practical_salinity": None,
+			"sea_water_practical_salinity_quality": None,
+			"sea_water_salinity": None,
+			"sea_water_salinity_quality": None,
+			"sea_water_temperature": None,
+			"sea_water_temperature_quality": None,
+			"surface_downwelling_photosynthetic_photon_flux_in_air": None,
+			"surface_downwelling_photosynthetic_photon_flux_in_air_quality": None,
+			"wet_bulb_temperature": None,
+			"wet_bulb_temperature_quality": None,
+			"wind_speed": None,
+			"wind_speed_quality": None,
+			"wind_from_direction": None,
+			"wind_from_direction_quality": None,
+			"wind_to_direction": None,
+			"wind_to_direction_quality": None,
+			"eastward_wind": None,
+			"northward_wind": None,
+			"wind_component_quality": None,
+			"device": None,
+			"job_id": "84c9ca0c-207d-4e78-aa1d-4eeaec87d2a5"
+		}
+	],
+	"last": "keep browsing next till there is nothing left",
+	"first": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination?itemsPerPage=1000&startTime=2018-05-10T21:00:00Z&endTime=2018-05-29T21:00:00Z&bbox=-100.0,20.0,-79.0,29.884000009000008&minDepth=-20.0&maxDepth=10.0&provider=NCAR&project=ICOADS Release 3.0&platform=42",
+	"prev": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination?itemsPerPage=1000&startTime=2018-05-10T21:00:00Z&endTime=2018-05-29T21:00:00Z&bbox=-100.0,20.0,-79.0,29.884000009000008&minDepth=-20.0&maxDepth=10.0&provider=NCAR&project=ICOADS Release 3.0&platform=42&markerTime=2018-05-21T18:49:48Z",
+	"next": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination?itemsPerPage=1000&startTime=2018-05-10T21:00:00Z&endTime=2018-05-29T21:00:00Z&bbox=-100.0,20.0,-79.0,29.884000009000008&minDepth=-20.0&maxDepth=10.0&provider=NCAR&project=ICOADS Release 3.0&platform=42&markerTime=2018-05-21T18:49:48Z&markerPlatform=3e7554e05503a345ffb806810d4b04fc0ff7e9c631d865ccbfbbce24c2e7817f"
+        }
+
+    try:
+        startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    provider = edge_endpoints.get_provider_name(dataset)
+
+    params = {
+        "itemsPerPage": itemsPerPage,
+        "startTime": startTime,
+        "endTime": endTime,
+        "bbox": bbox,
+        "minDepth": depth_min,
+        "maxDepth": depth_max,
+        "provider": provider,
+        "project": dataset,
+        "platform": platform,
+    }
+
+    if variable is not None:
+        params["variable"] = variable
+
+    edge_response = {}
+
+    # Get all edge results
+    next_page_url = edge_endpoints.getEndpoint()
+    while next_page_url is not None and next_page_url != 'NA':
+        logging.info(f'Edge request {next_page_url}')
+        if session is not None:
+            edge_page_request = session.get(next_page_url, params=params)
+        else:
+            edge_page_request = requests.get(next_page_url, params=params)
+
+        edge_page_request.raise_for_status()
+
+        edge_page_response = json.loads(edge_page_request.text)
+
+        if not edge_response:
+            edge_response = edge_page_response
+        else:
+            edge_response['results'].extend(edge_page_response['results'])
+
+        next_page_url = edge_page_response.get('next', None)
+        params = {}  # Remove params, they are already included in above URL
+
+    return edge_response
diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
index 39d08cf..4423e3a 100644
--- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
@@ -250,10 +250,15 @@ class NexusCalcSparkHandler(NexusCalcHandler):
             # t1 = time.time()
             # print 'NEXUS call start at time %f' % t1
             # sys.stdout.flush()
-            nexus_tiles = list(tile_service.fetch_data_for_tiles(*tiles))
-            nexus_tiles = list(tile_service.mask_tiles_to_bbox(min_lat, max_lat,
-                                                               min_lon, max_lon,
-                                                               nexus_tiles))
+            if not tile_service.supports_direct_bounds_to_tile():
+                nexus_tiles = list(tile_service.fetch_data_for_tiles(*tiles))
+                nexus_tiles = list(tile_service.mask_tiles_to_bbox(min_lat, max_lat,
+                                                                   min_lon, max_lon,
+                                                                   nexus_tiles))
+            else:
+                nexus_tiles = list(tile_service.get_nexus_data_for_bounds(min_lat, min_lon,
+                                                                          max_lat, max_lon,
+                                                                          start_time, end_time))
             # t2 = time.time()
             # print 'NEXUS call end at time %f' % t2
             # print 'Seconds in NEXUS call: ', t2-t1
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index faeaa0b..bd97948 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -488,17 +488,30 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates,
                                             timestamps[0],
                                             timestamps[-1],
                                             rows=5000,
-                                            metrics_callback=metrics_callback)
+                                            metrics_callback=metrics_callback,
+                                            split=[
+                                                datetime.utcfromtimestamp(ts).strftime(ISO_8601) for ts in timestamps
+                                            ])
     
     calculation_start = datetime.now()
 
     tile_dict = {}
     for timeinseconds in timestamps:
-        tile_dict[timeinseconds] = []
+        tile_dict[float(timeinseconds)] = []
 
     for i in range(len(ds1_nexus_tiles)):
         tile = ds1_nexus_tiles[i]
-        tile_dict[tile.times[0]].append(i)
+
+        ts = tile.times[0]
+
+        if not (isinstance(ts, int) or isinstance(ts, np.int64)):
+            logger.info(f"Non iteger type of timestamp: {type(ts)}")
+            ts = np.datetime64(ts, 's').astype(int)
+            tile.times = [ts]
+
+        tile_dict[ts].append(i)
+
+    logger.info(f'filled: {tile_dict}')
 
     stats_arr = []
     for timeinseconds in timestamps:
diff --git a/data-access/nexustiles/dao/ZarrProxy.py b/data-access/nexustiles/dao/ZarrProxy.py
new file mode 100644
index 0000000..5c57f94
--- /dev/null
+++ b/data-access/nexustiles/dao/ZarrProxy.py
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import boto3
+import numpy as np
+import s3fs
+import xarray as xr
+from dask.diagnostics import ProgressBar
+from webservice.webmodel import NexusProcessingException
+from datetime import datetime
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+
+# h = logging.StreamHandler()
+# h.setLevel(logging.DEBUG)
+# h.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+#
+# logger.addHandler(h)
+
+
+class NexusDataTile(object):
+    __data = None
+    tile_id = None
+
+    def __init__(self, data, _tile_id, var_name, coords, lazy=False):   #change to data (dataset subset w/ temporal range), tid, coords
+        import re
+
+        if self.__data is None:
+            self.__data = data
+
+        if self.tile_id is None:
+            self.tile_id = _tile_id
+
+        if not re.search("^.*_[0-9-T :.+]*_[0-9-T :.+]*_[0-9.-]*_[0-9.-]*_[0-9.-]*_[0-9.-]*$", self.tile_id):
+            raise NexusProcessingException(reason="Bad tile id", code=500)
+
+        self.__vars = var_name
+
+        self.__coords = coords
+
+        self.__lat, self.__lon, self.__time, self.__vdata, self.__meta, self.__mv = self._get_data()
+
+    def get_raw_data_array(self):
+        return self.__data
+
+    def get_lat_lon_time_data_meta(self):
+        return self.__lat, self.__lon, self.__time, self.__vdata, self.__meta, self.__mv
+
+    def as_model_tile(self):
+        from nexustiles.model.nexusmodel import Tile, TileVariable
+
+        tile = Tile()
+
+        tile.latitudes = self.__lat
+        tile.longitudes = self.__lon
+        tile.times = self.__time
+        tile.data = self.__vdata
+        tile.is_multi = self.__mv
+        tile.meta_data = self.__meta
+        tile.tile_id = self.tile_id
+
+        tile.dataset = self.__meta['main']['id'] if 'id' in self.__meta['main'] else None
+        tile.dataset_id = self.__meta['main']['uuid'] if 'uuid' in self.__meta['main'] else None
+        tile.granule = self.__meta['main']['granules'] if 'granules' in self.__meta['main'] else None
+
+        variables = []
+
+        for var in self.__data.data_vars:
+            try:
+                standard_name = self.__meta[var]['standard_name']
+            except:
+                standard_name = None
+
+            variables.append(TileVariable(var, standard_name))
+
+        tile.variables = variables
+
+        return tile
+
+    def _get_data(self):
+        isMultiVar = False
+
+        metadata = {'main': self.__data.attrs, 'lat': self.__data[self.__coords['latitude']].attrs,
+                    'lon': self.__data[self.__coords['longitude']].attrs, 'time': self.__data[self.__coords['time']].attrs}
+
+        for var in self.__vars:
+            metadata[var['name_s']] = self.__data[var['name_s']].attrs
+
+        tile_type = 'grid_tile'
+
+        if not isMultiVar:
+            self.__vars = self.__vars[0]
+
+        if tile_type == 'grid_tile': #for now, assume gridded
+            latitude_data = np.ma.masked_invalid(self.__data[self.__coords['latitude']])
+            longitude_data = np.ma.masked_invalid(self.__data[self.__coords['longitude']])
+            grid_tile_data = np.ma.masked_invalid(self.__data[self.__vars['name_s']])
+        else:
+            raise NotImplementedError("Only supports grid_tile")
+
+        if len(grid_tile_data.shape) == 2:
+            grid_tile_data = grid_tile_data[np.newaxis, :]
+
+        return latitude_data, longitude_data, self.__data[self.__coords['time']].values, grid_tile_data, metadata, isMultiVar
+
+class ZarrProxy(object):
+    mock_solr_for_testing = True
+
+    def __init__(self, config, test_fs = None, open_direct=False, **kwargs):
+        from .SolrProxy import SolrProxy
+
+        import io, configparser
+
+        self.config = config
+        self.__s3_bucket_name = config.get("s3", "bucket")
+        self.__s3_region = config.get("s3", "region")
+        self.__s3_public = config.getboolean("s3", "public", fallback=False)
+        self.__s3_profile = config.get("s3", "profile", fallback=None)
+        self.__s3 = boto3.resource('s3')
+        self.__nexus_tile = None
+
+        self.__coords = {
+            "latitude": "lat",
+            "longitude": "lon",
+            "time": "time"
+        }
+
+        solr_config_txt = f"""
+        [solr]
+        host={config.get("solr", "host", fallback='http://localhost:8983')}
+        core=nexusdatasets
+        """
+
+        buf = io.StringIO(solr_config_txt)
+        solr_config = configparser.ConfigParser()
+        solr_config.read_file(buf)
+
+        if not ZarrProxy.mock_solr_for_testing:
+            self._metadata_store = SolrProxy(solr_config)
+        else:
+            import mock
+
+            mock_solr = mock.MagicMock()
+            mock_solr.do_query_raw = ZarrProxy.mock_query
+
+            self._metadata_store = mock_solr
+
+
+        if open_direct:
+            logger.info('Opening Zarr proxy')
+
+            if self.__s3_public:
+                store = f"https://{self.__s3_bucket_name}.s3.{self.__s3_region}.amazonaws.com/{self.config.get('s3', 'key')}"
+            else:
+                s3path = f"s3://{self.__s3_bucket_name}/{self.config.get('s3', 'key')}/"
+                s3 = s3fs.S3FileSystem(self.__s3_public, profile=self.__s3_profile) if test_fs is None else test_fs
+                store = s3fs.S3Map(root=s3path, s3=s3, check=False)
+
+            zarr_data = xr.open_zarr(store=store, consolidated=True, mask_and_scale=False)
+            zarr_data.analysed_sst.attrs['_FillValue'] = -32768
+            zarr_data = xr.decode_cf(zarr_data, mask_and_scale=True)
+
+            self.__variables = [{"name_s": "analysed_sst", "fill_d": -32768}]
+
+            logger.info('Successfully opened Zarr proxy')
+
+            self.__zarr_data = zarr_data
+
+    def _get_ds_info(self, ds):
+        store = self._metadata_store
+
+        query_response = store.do_query_raw((f'id:{ds}'))
+
+        if not query_response['responseHeader']['status'] == 0:
+            raise NexusProcessingException(reason="bad solr response")
+
+        if not query_response['response']['numFound'] == 1:
+            raise  NexusProcessingException(
+                reason=f"wrong number of datasets returned from solr: {query_response['response']['numFound']}",
+                code=400 if query_response['response']['numFound'] == 0 else 500
+            )
+
+        ds_info = query_response['response']['docs'][0]
+
+        logger.debug(f'S3 URI: {ds_info["s3_uri_s"]}')
+
+        return ds_info['variables'], ds_info['public_b'], ds_info['s3_uri_s'], ds_info['coordinate_vars']
+
+    def open_dataset(self, ds, test_fs = None):
+        variables, public, path, coords = self._get_ds_info(ds)
+
+        logger.info(f'Opening Zarr proxy for {ds}')
+
+        if public:
+            store = path
+        else:
+            s3path = path
+            s3 = s3fs.S3FileSystem(public, profile=self.__s3_profile) if test_fs is None else test_fs
+            store = s3fs.S3Map(root=s3path, s3=s3, check=False)
+
+        zarr_data = xr.open_zarr(store=store, consolidated=True, mask_and_scale=False)
+
+        for variable in variables:
+            zarr_data[variable['name_s']].attrs['_FillValue'] = variable['fill_d']
+
+        zarr_data = xr.decode_cf(zarr_data, mask_and_scale=True)
+
+        logger.info(f'Successfully opened Zarr proxy for {ds}')
+
+        self.__zarr_data = zarr_data
+        self.__variables = variables
+        self.__coords = coords
+
+    #Interpreting tile id's as: MUR_<start_time>_<end_tim>_<lat_min>_<lat_max>_<lon_min>_<lon_max>
+    def fetch_nexus_tiles(self, *tile_ids):
+        import re
+
+        if not isinstance(tile_ids[0], str):
+            tile_ids = [str(tile.tile_id) for tile in tile_ids]
+
+        logger.debug(f'Starting fetch for {len(tile_ids)} tiles')
+
+        res = []
+
+        for tid in tile_ids:
+            parts = ZarrProxy.parse_tile_id_to_bounds(tid)
+
+            tz_regex = "\\+00:00$"
+
+            if re.search(tz_regex, parts['start_time']):
+                parts['start_time'] = re.split(tz_regex, parts['start_time'])[0]
+
+            if re.search(tz_regex, parts['end_time']):
+                parts['end_time'] = re.split(tz_regex, parts['end_time'])[0]
+
+            logger.debug(f"getting {parts['id']}")
+
+            times = slice(parts['start_time'], parts['end_time'])
+            lats = slice(parts['min_lat'], parts['max_lat'])
+            lons = slice(parts['min_lon'], parts['max_lon'])
+
+            idx = {
+                self.__coords['latitude']: lats,
+                self.__coords['longitude']: lons,
+                self.__coords['time']: times,
+            }
+
+            nexus_tile = NexusDataTile(self.__zarr_data.sel(idx), parts['id'], self.__variables, self.__coords)
+            res.append(nexus_tile)
+
+        return res
+
+    def find_days_in_range_asc(self, min_time, max_time):
+        max_time = datetime.utcfromtimestamp(max_time).isoformat()
+        min_time = datetime.utcfromtimestamp(min_time).isoformat()
+
+        times = slice(min_time, max_time)
+        time_subset = self.__zarr_data.sel(time=times)['time'].data
+
+        time_subset = time_subset.astype(datetime)/1000000000
+
+        # return sorted([datetime.utcfromtimestamp(t).isoformat() for t in time_subset])
+        return sorted(time_subset.tolist())
+
+
+    @staticmethod
+    def parse_tile_id_to_bounds(tile_id):
+        import re
+
+        c = re.split("_", tile_id)
+
+        parts = {
+            'id': tile_id,
+            'start_time': c[-6],
+            'end_time': c[-5],
+            'min_lat': float(c[-4]),
+            'max_lat': float(c[-3]),
+            'min_lon': float(c[-2]),
+            'max_lon': float(c[-1])
+        }
+
+        return parts
+
+    @staticmethod
+    def mock_query(ds):
+        import json
+
+        if ds == "id:MUR25-JPL-L4-GLOB-v04.2":
+            return json.load(open("/Users/rileykk/repo/incubator-sdap-nexus/data-access/tests/mock_mur_meta.json"))
+        elif ds == "id:OISSS_L4_multimission_7day_v1":
+            return json.load(open("/Users/rileykk/repo/incubator-sdap-nexus/data-access/tests/mock_oisss_meta.json"))
+        elif ds == "id:JPL-L4-MRVA-CHLA-GLOB-v3.0":
+            return json.load(open("/Users/rileykk/repo/incubator-sdap-nexus/data-access/tests/mock_chla_meta.json"))
+        elif ds == "id:SMAP_JPL_L3_SSS_CAP_8DAY-RUNNINGMEAN_V5":
+            return json.load(open("/Users/rileykk/repo/incubator-sdap-nexus/data-access/tests/mock_smap_meta.json"))
+        else:
+            raise ValueError("unsupported dataset")
+
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index a3aa61e..43696b8 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -29,6 +29,7 @@ from shapely.geometry import MultiPolygon, box
 from .dao import CassandraProxy
 from .dao import DynamoProxy
 from .dao import S3Proxy
+from .dao import ZarrProxy
 from .dao import SolrProxy
 from .dao import ElasticsearchProxy
 
@@ -83,12 +84,21 @@ class NexusTileService(object):
         self._datastore = None
         self._metadatastore = None
 
+        self.current_ds = None
+
         self._config = configparser.RawConfigParser()
         self._config.read(NexusTileService._get_config_files('config/datastores.ini'))
 
         if config:
             self.override_config(config)
 
+        if not skipMetadatastore:
+            metadatastore = self._config.get("metadatastore", "store", fallback='solr')
+            if metadatastore == "solr":
+                self._metadatastore = SolrProxy.SolrProxy(self._config)
+            elif metadatastore == "elasticsearch":
+                self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config)
+
         if not skipDatastore:
             datastore = self._config.get("datastore", "store")
             if datastore == "cassandra":
@@ -97,16 +107,14 @@ class NexusTileService(object):
                 self._datastore = S3Proxy.S3Proxy(self._config)
             elif datastore == "dynamo":
                 self._datastore = DynamoProxy.DynamoProxy(self._config)
+            elif datastore == "zarrS3":
+                if not skipMetadatastore and metadatastore == "solr":
+                    self._datastore = ZarrProxy.ZarrProxy(self._config, metadata_store=self._metadatastore)
+                else:
+                    self._datastore = ZarrProxy.ZarrProxy(self._config)
             else:
                 raise ValueError("Error reading datastore from config file")
 
-        if not skipMetadatastore:
-            metadatastore = self._config.get("metadatastore", "store", fallback='solr')
-            if metadatastore == "solr":
-                self._metadatastore = SolrProxy.SolrProxy(self._config)
-            elif metadatastore == "elasticsearch":
-                self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config)
-
     def override_config(self, config):
         for section in config.sections():
             if self._config.has_section(section):  # only override preexisting section, ignores the other
@@ -131,7 +139,14 @@ class NexusTileService(object):
     def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
                                metrics_callback=None, **kwargs):
         start = datetime.now()
-        result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time,
+        if self.supports_direct_bounds_to_tile():
+            if self.current_ds != dataset:
+                self._datastore.open_dataset(dataset)
+                self.current_ds = dataset
+
+            result = self._datastore.find_days_in_range_asc(start_time, end_time)
+        else:
+            result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time,
                                                             end_time,
                                                             **kwargs)
         duration = (datetime.now() - start).total_seconds()
@@ -259,10 +274,41 @@ class NexusTileService(object):
 
     def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1,
                                  **kwargs):
-        tiles = self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs)
-        tiles = self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles)
-        if 0 <= start_time <= end_time:
-            tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles)
+        if self.supports_direct_bounds_to_tile():
+            if ds and ds != self.current_ds:
+                self._datastore.open_dataset(ds)
+                self.current_ds = ds
+
+            ISO = '%Y-%m-%dT%H:%M:%S%z'
+
+            start_time = datetime.utcfromtimestamp(start_time).strftime(ISO)
+            end_time = datetime.utcfromtimestamp(end_time).strftime(ISO)
+
+            if not 'split' in kwargs:
+                tiles = [t.as_model_tile() for t in self._datastore.fetch_nexus_tiles(
+                    self.bounds_to_direct_tile_id(min_lat, min_lon, max_lat, max_lon, start_time, end_time, ds)
+                 )]
+            else:
+                timestamps = kwargs['split']
+
+                tiles = []
+                tile_ids = []
+
+                for i in range(len(timestamps)):
+                    tile_ids.append(self.bounds_to_direct_tile_id(min_lat, min_lon, max_lat, max_lon, timestamps[i], timestamps[i], ds))
+
+                    #tiles.extend(
+                    #    [t.as_model_tile() for t in self._datastore.fetch_nexus_tiles(
+                    #        self.bounds_to_direct_tile_id(min_lat, min_lon, max_lat, max_lon, timestamps[i], timestamps[i], ds)
+                    #    )]
+                    #)
+
+                tiles.extend([t.as_model_tile() for t in self._datastore.fetch_nexus_tiles(*tile_ids)])
+        else:
+            tiles = self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs)
+            tiles = self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles)
+            if 0 <= start_time <= end_time:
+                tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles)
 
         return tiles
 
@@ -475,6 +521,55 @@ class NexusTileService(object):
 
         return tiles
 
+    def bounds_to_direct_tile_id(self, min_lat, min_lon, max_lat, max_lon, start_time, end_time, dataset='TILE'):
+        return f"{dataset}_{start_time}_{end_time}_{min_lat}_{max_lat}_{min_lon}_{max_lon}"
+
+    def fetch_direct_by_id(self, tid):
+        return self._datastore.fetch_nexus_tiles(*[tid])
+
+    def get_nexus_data_for_bounds(self, min_lat, min_lon, max_lat, max_lon, start_time, end_time, dataset='TILE'):
+        """
+        Directly fetch tile data that fits to the given bounds without having to query the metadata store first.
+        Only works if the data store supports this (ie, ZarrProxy).
+        Instead of using UUIDs for the tiles, the tile bounds are encoded in a tile id string and the proxy builds a
+        'dynamic' tile out of those bounds.
+        :param min_lat: Minimum bound latitude
+        :param min_lon: Minimum bound longitude
+        :param max_lat: Maximum bound latitude
+        :param max_lon: Maximum bound longitude
+        :param start_time: Start of temporal bound
+        :param end_time: End of temporal bound
+        :param dataset: Name of dataset (For now, ZarrProxy only supports MUR)
+        :return: tile data
+        """
+
+        class TileData:
+            self.latitudes = None
+            self.longitudes = None
+            self.times = None
+            self.data = None
+            self.meta_data = None
+            self.is_multi = None
+
+            def __init__(self, tid):
+                self.tile_id = tid
+
+        tile = TileData(self.bounds_to_direct_tile_id(min_lat, min_lon, max_lat, max_lon, start_time, end_time, dataset))
+
+        lats, lons, times, data, meta, is_multi_var = self.fetch_direct_by_id(tile)[0].get_lat_lon_time_data_meta()
+
+        tile.latitudes = lats
+        tile.longitudes = lons
+        tile.times = times
+        tile.data = data
+        tile.meta_data = meta
+        tile.is_multi = is_multi_var
+
+        return [tile]
+
+    def supports_direct_bounds_to_tile(self):
+        return isinstance(self._datastore, ZarrProxy.ZarrProxy)
+
     def _metadata_store_docs_to_tiles(self, *store_docs):
 
         tiles = []
@@ -595,6 +690,9 @@ class NexusTileService(object):
         else:
             return False
 
+    def get_datastore(self):
+        return self._datastore
+
     @staticmethod
     def _get_config_files(filename):
         log = logging.getLogger(__name__)
diff --git a/data-access/tests/data/mock_insitu_schema.json b/data-access/tests/data/mock_insitu_schema.json
new file mode 100644
index 0000000..6502f18
--- /dev/null
+++ b/data-access/tests/data/mock_insitu_schema.json
@@ -0,0 +1 @@
+{"$schema": "http://json-schema.org/draft-07/schema#", "title": "Cloud-based Data Match-Up Service In Situ Schema", "description": "Schema for in situ data", "properties": {"provider": {"description": "", "type": "string"}, "project": {"description": "", "type": "string"}, "observations": {"type": "array", "items": {"$ref": "#/definitions/observation"}, "minItems": 1}}, "definitions": {"platform": {"description": "", "type": "object", "additionalProperties": false, "properties": {"code": [...]
\ No newline at end of file
diff --git a/data-access/tests/data/mock_response_direct.json b/data-access/tests/data/mock_response_direct.json
new file mode 100644
index 0000000..387185a
--- /dev/null
+++ b/data-access/tests/data/mock_response_direct.json
@@ -0,0 +1 @@
+{"total": 2327, "results": [{"depth": -99999.0, "latitude": 29.91, "longitude": -81.29, "meta": "https://rda.ucar.edu/php/icoadsuid.php?uid=OL2L68", "platform": {"type": "7", "code": "42", "id": "42563"}, "time": "2018-03-26T15:00:00Z", "provider": "NCAR", "project": "ICOADS Release 3.0", "platform_code": "42", "air_pressure": null, "air_pressure_quality": null, "air_temperature": null, "air_temperature_quality": null, "dew_point_temperature": null, "dew_point_temperature_quality": null, [...]
\ No newline at end of file
diff --git a/data-access/tests/data/mock_response_indirect.json b/data-access/tests/data/mock_response_indirect.json
new file mode 100644
index 0000000..d525bf1
--- /dev/null
+++ b/data-access/tests/data/mock_response_indirect.json
@@ -0,0 +1 @@
+{"total": 827, "results": [{"depth": -99999.0, "latitude": 27.3, "longitude": -87.38, "meta": "https://rda.ucar.edu/php/icoadsuid.php?uid=NB7GX5", "platform": {"type": "7", "code": "42", "id": "43572"}, "time": "2017-03-12T12:15:00Z", "provider": "NCAR", "project": "ICOADS Release 3.0", "platform_code": "42", "air_pressure": null, "air_pressure_quality": null, "air_temperature": null, "air_temperature_quality": null, "dew_point_temperature": null, "dew_point_temperature_quality": null, " [...]
\ No newline at end of file
diff --git a/data-access/tests/mock_chla_meta.json b/data-access/tests/mock_chla_meta.json
new file mode 100644
index 0000000..678fff7
--- /dev/null
+++ b/data-access/tests/mock_chla_meta.json
@@ -0,0 +1,38 @@
+{
+  "responseHeader": {
+    "status": 0
+  },
+  "response": {
+    "numFound": 1,
+    "docs": [
+      {
+        "id": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "latest_update_l": 1637632069,
+        "_version_": 1723706460913795072,
+        "dataset_s": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "variables": [{
+            "name_s": "CHLA_analysis",
+            "fill_d": -999.0
+        },{
+            "name_s": "analysis_error",
+            "fill_d": -999.0
+        },{
+            "name_s": "dt_data",
+            "fill_d": -128
+        },{
+            "name_s": "mask",
+            "fill_d": -128
+        }],
+        "coordinate_vars": {
+          "latitude": "lat",
+          "longitude": "lon",
+          "time": "time"
+        },
+        "s3_uri_s": "s3://cdms-dev-zarr/JPL-L4-MRVA-CHLA-GLOB-v3.0/",
+        "public_b": false,
+        "type_s": "gridded",
+        "chunk_shape": [30, 120, 184]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/data-access/tests/mock_mur_meta.json b/data-access/tests/mock_mur_meta.json
new file mode 100644
index 0000000..fa6d627
--- /dev/null
+++ b/data-access/tests/mock_mur_meta.json
@@ -0,0 +1,29 @@
+{
+  "responseHeader": {
+    "status": 0
+  },
+  "response": {
+    "numFound": 1,
+    "docs": [
+      {
+        "id": "MUR25-JPL-L4-GLOB-v04.2",
+        "latest_update_l": 1637629358,
+        "_version_": 1718445323844583426,
+        "dataset_s": "MUR25-JPL-L4-GLOB-v04.2",
+        "variables": [{
+            "name_s": "analysed_sst",
+            "fill_d": -32768
+        }],
+        "coordinate_vars": {
+          "latitude": "lat",
+          "longitude": "lon",
+          "time": "time"
+        },
+        "s3_uri_s": "s3://cdms-dev-zarr/MUR25-JPL-L4-GLOB-v04.2/",
+        "public_b": false,
+        "type_s": "gridded",
+        "chunk_shape": [30, 120, 240]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/data-access/tests/mock_oisss_meta.json b/data-access/tests/mock_oisss_meta.json
new file mode 100644
index 0000000..41ee899
--- /dev/null
+++ b/data-access/tests/mock_oisss_meta.json
@@ -0,0 +1,29 @@
+{
+  "responseHeader": {
+    "status": 0
+  },
+  "response": {
+    "numFound": 1,
+    "docs": [
+      {
+        "id": "OISSS_L4_multimission_7day_v1",
+        "latest_update_l": 1646163906,
+        "_version_": 1726128369353883648,
+        "dataset_s": "OISSS_L4_multimission_7day_v1",
+        "variables": [{
+            "name_s": "sss",
+            "fill_d": -999.0
+        }],
+        "coordinate_vars": {
+          "latitude": "lat",
+          "longitude": "lon",
+          "time": "time"
+        },
+        "s3_uri_s": "s3://cdms-dev-zarr/OISSS_L4_multimission_7day_v1/",
+        "public_b": false,
+        "type_s": "gridded",
+        "chunk_shape": [30, 120, 184]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/data-access/tests/mock_smap_meta.json b/data-access/tests/mock_smap_meta.json
new file mode 100644
index 0000000..2fe4363
--- /dev/null
+++ b/data-access/tests/mock_smap_meta.json
@@ -0,0 +1,29 @@
+{
+  "responseHeader": {
+    "status": 0
+  },
+  "response": {
+    "numFound": 1,
+    "docs": [
+      {
+        "id": "SMAP_JPL_L3_SSS_CAP_8DAY-RUNNINGMEAN_V5",
+        "latest_update_l": 1637629358,
+        "_version_": 1718445323844583426,
+        "dataset_s": "SMAP_JPL_L3_SSS_CAP_8DAY-RUNNINGMEAN_V5",
+        "variables": [{
+            "name_s": "smap_sss",
+            "fill_d": -9999.0
+        }],
+        "coordinate_vars": {
+          "latitude": "latitude",
+          "longitude": "longitude",
+          "time": "time"
+        },
+        "s3_uri_s": "s3://cdms-dev-zarr/SMAP_JPL_L3_SSS_CAP_8DAY-RUNNINGMEAN_V5_7_120_240.zarr/",
+        "public_b": false,
+        "type_s": "gridded",
+        "chunk_shape": [7, 120, 240]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/data-access/tests/test_zarr.py b/data-access/tests/test_zarr.py
new file mode 100644
index 0000000..bd9fa97
--- /dev/null
+++ b/data-access/tests/test_zarr.py
@@ -0,0 +1,398 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import configparser
+import io
+import os
+
+import json
+import mock
+import numpy as np
+import pytest
+import s3fs
+from moto import mock_s3
+from nexustiles.nexustiles import NexusTileService
+import webservice.algorithms_spark.Matchup as matchup
+
+
+class DummyTile:
+    def __init__(self, tile_id):
+        self.tile_id = tile_id
+
+class MockSparkParam:
+    def __init__(self, value):
+        self.value = value
+
+mock_s3 = mock_s3()
+bucket_name = 'cdms-dev-zarr'
+root_key = ""
+region = 'us-west-2'
+
+port = 5555
+endpoint = f"http://127.0.0.1:{port}"
+
+test_data_path = os.getenv('TEST_DATA', 'data/zarr_test_data/')
+
+@pytest.fixture(scope="module")
+def s3():
+    # Got this from s3fs test on github
+    # https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
+
+    import shlex
+    import subprocess
+    import time
+    import requests
+
+    try:
+        # should fail since we didn't start server yet
+        r = requests.get(endpoint)
+    except:
+        pass
+    else:
+        if r.ok:
+            raise RuntimeError("moto server already up")
+    if "AWS_SECRET_ACCESS_KEY" not in os.environ:
+        os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
+    if "AWS_ACCESS_KEY_ID" not in os.environ:
+        os.environ["AWS_ACCESS_KEY_ID"] = "foo"
+    proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port))
+
+    timeout = 5
+    while timeout > 0:
+        try:
+            r = requests.get(endpoint)
+            if r.ok:
+                break
+        except:
+            pass
+        timeout -= 0.1
+        time.sleep(0.1)
+
+    from botocore.session import Session
+    session = Session()
+    client = session.create_client("s3", endpoint_url=endpoint)
+
+    client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region})
+
+    fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint})
+
+    for root, dirs, filenames in os.walk(test_data_path):
+        for file in filenames:
+            local = os.path.join(root, file)
+            rel = os.path.relpath(local, test_data_path)
+
+            key = os.path.join(bucket_name, root_key, rel)
+
+            fs.put(local, key)
+
+    yield fs
+    proc.terminate()
+    proc.wait()
+
+@pytest.fixture()
+def bounds():
+    yield {
+        'min_lat': 20,
+        'max_lat': 30,
+        'min_lon': -100,
+        'max_lon': -79,
+        'start_time': '2018-01-01T09:00:00+00:00',
+        'end_time': '2018-09-01T00:00:00+00:00'
+    }
+
+@pytest.fixture()
+def ts_config():
+    cfg = f"""
+        [s3]
+        bucket={bucket_name}
+        key={root_key}
+        region={region}
+        public=false
+        [datastore]
+        store=zarrS3
+        """
+
+    buf = io.StringIO(cfg)
+    config = configparser.ConfigParser()
+    config.read_file(buf)
+
+    yield config
+
+def do_nothing(arg):
+    pass
+
+@pytest.fixture()
+def tile_service_direct(s3, ts_config):
+    from nexustiles.dao.ZarrProxy import ZarrProxy
+
+    ts_config.set("s3", "key", "MUR25-JPL-L4-GLOB-v04.2")
+
+    svc = NexusTileService(skipMetadatastore=True, config=ts_config, skipDatastore=True)
+    svc._datastore = ZarrProxy(ts_config, open_direct=True, test_fs=s3)
+    svc._datastore.open_dataset = do_nothing
+
+    def mock_query(ds):
+        pass
+
+    mock_solr = mock.MagicMock()
+    mock_solr.do_query_raw = mock_query
+
+    yield svc
+
+@pytest.fixture()
+def tile_service_indirect(s3, ts_config):
+    from nexustiles.dao.ZarrProxy import ZarrProxy
+
+    svc = NexusTileService(skipMetadatastore=True, config=ts_config, skipDatastore=True)
+    svc._datastore = ZarrProxy(ts_config, test_fs=s3)
+
+    def mock_query(ds):
+        if ds == "id:MUR25-JPL-L4-GLOB-v04.2":
+            return json.load(open("mock_mur_meta.json"))
+        elif ds == "id:OISSS_L4_multimission_7day_v1":
+            return json.load(open("mock_oisss_meta.json"))
+        else:
+            raise ValueError("unsupported dataset")
+
+    mock_solr = mock.MagicMock()
+    mock_solr.do_query_raw = mock_query
+
+    svc._datastore._metadata_store = mock_solr
+    svc._datastore.open_dataset("OISSS_L4_multimission_7day_v1", test_fs=s3)
+    svc._datastore.open_dataset = do_nothing
+
+    yield svc
+
+def test_setup(s3):
+    assert True
+
+def test_bounds(bounds, tile_service_direct):
+    tile_id = tile_service_direct.bounds_to_direct_tile_id(
+        bounds['min_lat'],
+        bounds['min_lon'],
+        bounds['max_lat'],
+        bounds['max_lon'],
+        bounds['start_time'],
+        bounds['end_time']
+    )
+
+    tiles = [DummyTile(tile_id)]
+
+    tile_data = tile_service_direct.fetch_data_for_tiles(*tiles)[0]
+
+    assert bounds['min_lat'] <= np.amin(tile_data.latitudes)
+    assert bounds['max_lat'] >= np.amax(tile_data.latitudes)
+    assert bounds['min_lon'] <= np.amin(tile_data.longitudes)
+    assert bounds['max_lon'] >= np.amax(tile_data.longitudes)
+
+    assert bounds['start_time'] <= str(np.amin(tile_data.times))
+    assert bounds['end_time'] >= str(np.amax(tile_data.times))
+
+def filter_time(res, match_args):
+    filtered = []
+
+    for p in res:
+        if abs(matchup.iso_time_to_epoch(p[0].time) - matchup.iso_time_to_epoch(p[1].time)) <= match_args['tt_b'].value:
+            filtered.append(p)
+
+    return filtered
+
+# [(k,v),...] -> {k: [v,...],...}
+def to_map(res):
+    mapped = {}
+
+    for p in res:
+        k = p[0]
+        v = p[1]
+
+        if not k in mapped:
+            mapped[k] = [v]
+        else:
+            mapped[k].append(v)
+
+    return mapped
+
+def validate_point(point, time, lon, lat, value, name, cf_name=None, secondary_point=False):
+    assert point.time == time
+    assert point.longitude == lon
+    assert point.latitude == lat
+    if not secondary_point:
+        assert point.data[0].variable_value == value
+        assert point.data[0].variable_name == name
+        assert point.data[0].cf_variable_name == cf_name
+    else:
+        assert point.data[0].variable_value == value[0]
+        assert point.data[0].variable_name == name[0]
+        assert point.data[1].variable_value == value[1]
+        assert point.data[1].variable_name == name[1]
+
+def test_matchup_direct_open(bounds, tile_service_direct):
+    tile_service_factory = mock.MagicMock()
+    tile_service_factory.return_value = tile_service_direct
+
+    tile_id = tile_service_direct.bounds_to_direct_tile_id(
+        bounds['min_lat'],
+        bounds['min_lon'],
+        bounds['max_lat'],
+        bounds['max_lon'],
+        bounds['start_time'],
+        bounds['end_time'],
+        dataset="MUR"
+    )
+
+    tiles = [tile_id]
+
+    with mock.patch('webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName') as mock_edge_endpoints:
+        match_args = dict(
+            tile_ids=tiles,
+            primary_b=MockSparkParam('MUR25-JPL-L4-GLOB-v04.2'),
+            secondary_b=MockSparkParam('ICOADS Release 3.0'),
+            parameter_b=MockSparkParam(None),
+            tt_b=MockSparkParam(43200),
+            rt_b=MockSparkParam(1000),
+            platforms_b=MockSparkParam('42'),
+            bounding_wkt_b=MockSparkParam('POLYGON((-100 20, -79 20, -79 30, -100 30, -100 20))'),
+            depth_min_b=MockSparkParam(-20.0),
+            depth_max_b=MockSparkParam(10.0),
+            tile_service_factory=tile_service_factory
+        )
+
+        test_dir = os.path.dirname(os.path.realpath(__file__))
+        test_data_dir = os.path.join(test_dir, 'data')
+
+        mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+        matchup.query_edge = lambda *args, **kwargs: json.load(open(os.path.join(test_data_dir, 'mock_response_direct.json')))
+        matchup.query_insitu_schema = lambda  *args, **kwargs: json.load(open(os.path.join(test_data_dir, 'mock_insitu_schema.json')))
+
+        generator = matchup.match_satellite_to_insitu(**match_args)
+
+        result = to_map(filter_time(list(generator), match_args))
+
+        assert len(result) == 4
+
+        keys = list(result.keys())
+
+        validate_point(keys[0], '2018-08-17T09:00:00Z', -90.125, 27.625, 303.447998046875, 'analysed_sst', 'sea_surface_foundation_temperature')
+        validate_point(keys[1], '2018-08-21T09:00:00Z', -90.375, 28.125, 303.49200439453125, 'analysed_sst', 'sea_surface_foundation_temperature')
+        validate_point(keys[2], '2018-08-22T09:00:00Z', -90.125, 28.125, 303.3800048828125, 'analysed_sst', 'sea_surface_foundation_temperature')
+        validate_point(keys[3], '2018-08-27T09:00:00Z', -86.125, 27.625, 303.4729919433594, 'analysed_sst', 'sea_surface_foundation_temperature')
+
+        v0 = result[keys[0]]
+        v1 = result[keys[1]]
+        v2 = result[keys[2]]
+        v3 = result[keys[3]]
+
+        assert len(v0) == 6
+        assert len(v1) == 1
+        assert len(v2) == 1
+        assert len(v3) == 2
+
+        validate_point(v0[0], '2018-08-17T05:00:00Z', -90.13, 27.62, [30.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v0[1], '2018-08-17T05:30:00Z', -90.13, 27.62, [30.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v0[2], '2018-08-17T06:00:00Z', -90.13, 27.62, [30.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v0[3], '2018-08-17T06:30:00Z', -90.13, 27.63, [30.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v0[4], '2018-08-17T07:00:00Z', -90.13, 27.63, [30.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v0[5], '2018-08-17T07:30:00Z', -90.13, 27.63, [30.3,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+
+        validate_point(v1[0], '2018-08-21T01:00:00Z', -90.38, 28.12, [30.0,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+
+        validate_point(v2[0], '2018-08-22T01:00:00Z', -90.13, 28.12, [30.3,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+
+        validate_point(v3[0], '2018-08-27T12:30:00Z', -86.12, 27.62, [30.0,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v3[1], '2018-08-27T13:00:00Z', -86.13, 27.63, [30.0,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+
+def test_matchup_select_ds(bounds, tile_service_indirect):
+    import webservice.algorithms_spark.Matchup as matchup
+
+    tile_service_factory = mock.MagicMock()
+    tile_service_factory.return_value = tile_service_indirect
+
+    bounds['start_time'] = "2017-03-01T09:00:00+00:00"
+    bounds['end_time'] = "2017-06-01T00:00:00+00:00"
+
+    tile_id = tile_service_indirect.bounds_to_direct_tile_id(
+        bounds['min_lat'],
+        bounds['min_lon'],
+        bounds['max_lat'],
+        bounds['max_lon'],
+        bounds['start_time'],
+        bounds['end_time'],
+        dataset="OISSS"
+    )
+
+    tiles = [tile_id]
+
+    with mock.patch('webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName') as mock_edge_endpoints:
+        match_args = dict(
+            tile_ids=tiles,
+            primary_b=MockSparkParam('OISSS_L4_multimission_7day_v1'),
+            secondary_b=MockSparkParam('ICOADS Release 3.0'),
+            parameter_b=MockSparkParam(None),
+            tt_b=MockSparkParam(43200),
+            rt_b=MockSparkParam(1000),
+            platforms_b=MockSparkParam('42'),
+            bounding_wkt_b=MockSparkParam('POLYGON((-100 20, -79 20, -79 30, -100 30, -100 20))'),
+            depth_min_b=MockSparkParam(-20.0),
+            depth_max_b=MockSparkParam(10.0),
+            tile_service_factory=tile_service_factory
+        )
+
+        test_dir = os.path.dirname(os.path.realpath(__file__))
+        test_data_dir = os.path.join(test_dir, 'data')
+
+        mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+        matchup.query_edge = lambda *args, **kwargs: json.load(open(os.path.join(test_data_dir, 'mock_response_indirect.json')))
+        matchup.query_insitu_schema = lambda  *args, **kwargs: json.load(open(os.path.join(test_data_dir, 'mock_insitu_schema.json')))
+
+        generator = matchup.match_satellite_to_insitu(**match_args)
+
+        #result =
+
+        #result = to_map(filter_time(result, match_args))
+        result = to_map(filter_time(list(generator), match_args))
+
+        assert len(result) == 3
+
+        keys = list(result.keys())
+
+        validate_point(keys[0], '2017-04-06T00:00:00Z', -90.625, 23.625, 36.32952880859375,  'sss', 'sea_surface_salinity')
+        validate_point(keys[1], '2017-04-10T00:00:00Z', -90.875, 23.875, 36.40681838989258,  'sss', 'sea_surface_salinity')
+        validate_point(keys[2], '2017-04-14T00:00:00Z', -89.125, 24.125, 36.365089416503906, 'sss', 'sea_surface_salinity')
+
+        v0 = result[keys[0]]
+        v1 = result[keys[1]]
+        v2 = result[keys[2]]
+
+        assert len(v0) == 2
+        assert len(v1) == 2
+        assert len(v2) == 12
+
+        validate_point(v0[ 0], '2017-04-06T04:16:12Z', -90.62, 23.63, [25.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v0[ 1], '2017-04-06T05:01:12Z', -90.62, 23.63, [25.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+
+        validate_point(v1[ 0], '2017-04-09T23:30:00Z', -90.87, 23.88, [24.8,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v1[ 1], '2017-04-09T23:34:48Z', -90.88, 23.88, [24.8,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+
+        validate_point(v2[ 0], '2017-04-13T22:30:00Z', -89.12, 24.12, [24.6,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 1], '2017-04-13T23:30:00Z', -89.12, 24.12, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 2], '2017-04-13T23:45:00Z', -89.12, 24.12, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 3], '2017-04-14T00:00:00Z', -89.12, 24.12, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 4], '2017-04-14T00:15:00Z', -89.12, 24.13, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 5], '2017-04-14T00:30:00Z', -89.12, 24.13, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 6], '2017-04-14T01:15:00Z', -89.12, 24.13, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 7], '2017-04-14T02:00:00Z', -89.13, 24.13, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 8], '2017-04-14T02:15:00Z', -89.13, 24.12, [24.5,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[ 9], '2017-04-14T03:45:00Z', -89.13, 24.12, [24.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[10], '2017-04-14T04:45:00Z', -89.13, 24.12, [24.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)
+        validate_point(v2[11], '2017-04-14T09:15:00Z', -89.13, 24.12, [24.4,1], ['sea_water_temperature', 'sea_water_temperature_quality'], secondary_point=True)