You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/07 17:59:21 UTC

(incubator-sdap-nexus) branch develop updated: SDAP-472 - data-access overhaul to support multiple simultaneous data backends (#294)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/develop by this push:
     new 90e0b00  SDAP-472 - data-access overhaul to support multiple simultaneous data backends (#294)
90e0b00 is described below

commit 90e0b004ddaec3a508688f92fd506b757b80f41c
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Thu Mar 7 09:59:16 2024 -0800

    SDAP-472 - data-access overhaul to support multiple simultaneous data backends (#294)
    
    * Separated NTS backends
    
    * n/a
    
    * More nts backend stuff
    
    * Working(?) np backend
    
    * Working(?) np backend
    
    * gitignore ini
    
    * ASF headers
    
    * First functioning test of 2 simultaneous backends
    
    * Removed accidentally committed ini files
    
    * Working zarr backend ds list
    
    + datasets are no longer case sensitive
    + handling for failed zarr ds opens (bad path, bad creds, &c)
    
    * Capture and handle NTS requests routed to backend that doesn't (yet) support them
    
    * analysis setup fails to find VERSION.txt when building locally
    
    * Implemented more NTS functions in zarr backend
    
    * Added misc backend time metrics record field in NCSH
    
    * fixes
    
    * Dynamic dataset management
    
    * Dynamic dataset management
    
    * Dataset management
    
    * Timeseriesspark support
    
    * Update backend dict on dataset mgmt query
    
    * Fixes and improvements
    
    * Adapted matchup to work with zarr backends
    
    * Zarr support
    
    - Distinct slices of time is now default
    - No longer assuming+shaping as multivar tiles unless needed
    
    * DDAS adjustments
    
    * find_tile_by_polygon_and_most_recent_day_of_year impl
    
    * Don't sel by time if neither max nor min time are given
    
    * Fix not calling partial when needed
    
    * Pinned s3fs and fsspec versions
    
    * Fixed some dependencies to ensure image builds properly + s3fs works
    
    * Config override for backends
    
    * Deps update
    
    * Add metadata from Zarr collection to /list
    
    * Zarr: Probe lat order and flip if necessary
    
    * Strip quotes from variable names
    
    CM can sometimes publish with extra quotes resulting in KeyErrors
    
    * removed resultSizeLimit param from matchup
    
    * Add # of primaries/avergae secondaries to job output
    
    * rename to executionId
    
    * update changelog
    
    * add totalSecondaryMatched field to /job output
    
    * num unique secondaries addition
    
    * updated docs to use correct sea_water_temperature param name
    
    * bugfix
    
    * fix division by zero bug
    
    * add params to dataset management handler classes
    
    * add page number to default filename for matchup output
    
    * pagination improvements
    
    * removed debugging line
    
    * changelog
    
    * Update helm cassandra dependency (#289)
    
    * Update helm cassandra dependency
    
    * Bump default cassandra PV to 4
    
    * Bump default cassandra PV to 4 in tools
    
    * Changelog
    
    * Fixed small documentation issue
    
    ---------
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
    
    * stac catalog
    
    * Updated openapi spec
    
    * move stac endpoints to matchup tag in openapi spec
    
    * SDAP-507 - Changes to remove geos sub-dependency
    
    * SDAP-507 - Changelog
    
    * SDAP-507 - Changes to remove geos sub-dependency
    
    * SDAP-507 - Changelog
    
    * delete instead of comment out
    
    * Revert "Update helm cassandra dependency (#289)"
    
    This reverts commit 1e8cc4e9d31d295e172c0db4bba61a5776642bea.
    
    * deleted disabled endpoint files
    
    * fix bug where still-running jobs failed /job endpoint due to missing metadata
    
    * Update .asf.yaml (#293)
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
    
    * Moved changelog entries
    
    * SDAP-472 changelog entries
    
    ---------
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
    Co-authored-by: skorper <st...@gmail.com>
---
 .gitignore                                         |   3 +-
 CHANGELOG.md                                       |   4 +
 analysis/conda-requirements.txt                    |   5 +-
 analysis/setup.py                                  |   7 +-
 .../algorithms/DailyDifferenceAverage.py           |   3 +-
 .../algorithms/StandardDeviationSearch.py          |   2 +-
 .../DailyDifferenceAverageSpark.py                 |   6 +-
 .../webservice/algorithms_spark/HofMoellerSpark.py |   8 +-
 analysis/webservice/algorithms_spark/Matchup.py    |  16 +-
 .../algorithms_spark/NexusCalcSparkHandler.py      |   3 +
 .../webservice/algorithms_spark/TimeSeriesSpark.py |   5 +-
 analysis/webservice/config/web.ini                 |   2 +-
 analysis/webservice/management/Datasets.py         | 242 +++++++++
 .../webservice/management}/__init__.py             |   2 +
 .../nexus_tornado/app_builders/NexusAppBuilder.py  |   2 +-
 .../request/handlers/NexusRequestHandler.py        |  29 ++
 analysis/webservice/webmodel/NexusRequestObject.py |   6 +
 data-access/nexustiles/AbstractTileService.py      | 202 ++++++++
 .../nexustiles/{dao => backends}/__init__.py       |   1 +
 .../{dao => backends/nexusproto}/__init__.py       |   1 +
 .../nexusproto/backend.py}                         |  70 +--
 .../nexusproto}/config/datastores.ini.default      |   0
 .../nexusproto}/dao/CassandraProxy.py              |   0
 .../{ => backends/nexusproto}/dao/DynamoProxy.py   |   0
 .../nexusproto}/dao/ElasticsearchProxy.py          |   0
 .../{ => backends/nexusproto}/dao/S3Proxy.py       |   0
 .../{ => backends/nexusproto}/dao/SolrProxy.py     |   3 +-
 .../{ => backends/nexusproto}/dao/__init__.py      |   0
 .../nexustiles/{dao => backends/zarr}/__init__.py  |   1 +
 data-access/nexustiles/backends/zarr/backend.py    | 532 ++++++++++++++++++++
 .../__init__.py => config/datasets.ini.default}    |   4 +
 .../nexustiles/{dao/__init__.py => exception.py}   |   4 +
 data-access/nexustiles/nexustiles.py               | 557 ++++++++++++++-------
 data-access/requirements.txt                       |   9 +-
 data-access/setup.py                               |  18 +-
 35 files changed, 1484 insertions(+), 263 deletions(-)

diff --git a/.gitignore b/.gitignore
index 12ab2d6..23f8435 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,5 +4,6 @@
 *.idea
 *.DS_Store
 analysis/webservice/algorithms/doms/domsconfig.ini
-data-access/nexustiles/config/datastores.ini
+data-access/nexustiles/backends/nexusproto/config/datastores.ini
+data-access/nexustiles/config/datasets.ini
 venv/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e0f873d..ff9e717 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,12 +10,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
   - Added STAC Catalog endpoint for matchup outputs
 - SDAP-508: Added spatial extents to the satellite dataset entries in `/list` and `/cdmslist`
 - SDAP-505: Added support for DOMS insitu api 
+- SDAP-472:
+  - Support for Zarr backend (gridded data only)
+  - Dataset management endpoints for Zarr datasets
 ### Changed
 - SDAP-493: 
   - Updated /job endpoint to use `executionId` terminology for consistency with existing `/cdmsresults` endpoint
   - Updated /job endpoint with details about number of primary and secondary tiles.
 - SDAP-500: Improvements to SDAP Asynchronous Jobs
 - SDAP-499: Added page number to default filename for matchup output
+- SDAP-472: Overhauled `data-access` to support multiple backends for simultaneous support of multiple ARD formats
 ### Deprecated
 ### Removed
 - SDAP-493: 
diff --git a/analysis/conda-requirements.txt b/analysis/conda-requirements.txt
index e27bdea..22dff06 100644
--- a/analysis/conda-requirements.txt
+++ b/analysis/conda-requirements.txt
@@ -22,7 +22,8 @@ pytz==2021.1
 utm==0.6.0
 shapely==1.7.1
 backports.functools_lru_cache==1.6.1
-boto3==1.16.63
+boto3>=1.16.63
+botocore==1.24.21
 pillow==8.1.0
 mpld3=0.5.1
 tornado==6.1
@@ -33,4 +34,4 @@ gdal==3.2.1
 mock==4.0.3
 importlib_metadata==4.11.4
 #singledispatch==3.4.0.3
-
+schema
diff --git a/analysis/setup.py b/analysis/setup.py
index 8fbc617..2e09815 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -17,8 +17,11 @@
 import setuptools
 from subprocess import check_call, CalledProcessError
 
-with open('../VERSION.txt', 'r') as f:
-    __version__ = f.read()
+try:
+    with open('../VERSION.txt', 'r') as f:
+        __version__ = f.read()
+except:
+    __version__ = None
 
 
 try:
diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py
index 05274fc..c6c8495 100644
--- a/analysis/webservice/algorithms/DailyDifferenceAverage.py
+++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py
@@ -21,7 +21,8 @@ from multiprocessing.dummy import Pool, Manager
 
 import numpy as np
 import pytz
-from nexustiles.nexustiles import NexusTileService, NexusTileServiceException
+from nexustiles.nexustiles import NexusTileService
+from nexustiles.exception import NexusTileServiceException
 from shapely.geometry import box
 
 from webservice.NexusHandler import nexus_handler
diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py
index ae0566f..26451cb 100644
--- a/analysis/webservice/algorithms/StandardDeviationSearch.py
+++ b/analysis/webservice/algorithms/StandardDeviationSearch.py
@@ -19,7 +19,7 @@ import logging
 from datetime import datetime
 from functools import partial
 
-from nexustiles.nexustiles import NexusTileServiceException
+from nexustiles.exception import NexusTileServiceException
 from pytz import timezone
 
 from webservice.NexusHandler import nexus_handler
diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
index b424578..12f7dee 100644
--- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -324,7 +324,7 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat
     for tile_id in tile_ids:
         # Get the dataset tile
         try:
-            dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id)
+            dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id, dataset.value)
         except NoDatasetTile:
             # This should only happen if all measurements in a tile become masked after applying the bounding polygon
             continue
@@ -348,12 +348,12 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat
     return chain(*diff_generators)
 
 
-def get_dataset_tile(tile_service, search_bounding_shape, tile_id):
+def get_dataset_tile(tile_service, search_bounding_shape, tile_id, dataset):
     the_time = datetime.now()
 
     try:
         # Load the dataset tile
-        dataset_tile = tile_service.find_tile_by_id(tile_id)[0]
+        dataset_tile = tile_service.find_tile_by_id(tile_id, ds=dataset)[0]
         # Mask it to the search domain
         dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0]
     except IndexError:
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index d50006a..e876a11 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -44,12 +44,12 @@ class HofMoellerCalculator(object):
     def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark):
 
         (latlon, tile_id, index,
-         min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+         min_lat, max_lat, min_lon, max_lon, dataset) = tile_in_spark
 
         tile_service = tile_service_factory()
         try:
             # Load the dataset tile
-            tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0]
+            tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback, ds=dataset)[0]
             calculation_start = datetime.now()
             # Mask it to the search domain
             tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
@@ -352,7 +352,7 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
 
         min_lon, min_lat, max_lon, max_lat = bbox.bounds
 
-        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in
                              enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
                                                                                   ds, start_time, end_time,
                                                                                   metrics_callback=metrics_record.record_metrics,
@@ -408,7 +408,7 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
 
         min_lon, min_lat, max_lon, max_lat = bbox.bounds
 
-        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in
                              enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
                                                                                   ds, start_time, end_time,
                                                                                   metrics_callback=metrics_record.record_metrics,
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index ee43624..e738a69 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -841,9 +841,9 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
     tile_service = tile_service_factory()
 
     # Determine the spatial temporal extents of this partition of tiles
-    tiles_bbox = tile_service.get_bounding_box(tile_ids)
-    tiles_min_time = tile_service.get_min_time(tile_ids)
-    tiles_max_time = tile_service.get_max_time(tile_ids)
+    tiles_bbox = tile_service.get_bounding_box(tile_ids, ds=primary_b.value)
+    tiles_min_time = tile_service.get_min_time(tile_ids, ds=primary_b.value)
+    tiles_max_time = tile_service.get_max_time(tile_ids, ds=primary_b.value)
 
     # Increase spatial extents by the radius tolerance
     matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
@@ -922,7 +922,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
         edge_results = []
         for tile in matchup_tiles:
             # Retrieve tile data and convert to lat/lon projection
-            tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True)
+            tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True, ds=secondary_b.value)
             tile = tiles[0]
 
             valid_indices = tile.get_indices()
@@ -948,14 +948,14 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
 
     # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
     match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
-                                                      parameter_b.value, rt_b.value, aeqd_proj) for tile_id
-                        in tile_ids]
+                                                      parameter_b.value, rt_b.value, aeqd_proj, primary_b.value)
+                        for tile_id in tile_ids]
 
     return chain(*match_generators)
 
 
 def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
-                                  search_parameter, radius_tolerance, aeqd_proj):
+                                  search_parameter, radius_tolerance, aeqd_proj, primary_ds):
     from nexustiles.model.nexusmodel import NexusPoint
     from webservice.algorithms_spark.Matchup import DomsPoint  # Must import DomsPoint or Spark complains
 
@@ -963,7 +963,7 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
     try:
         the_time = datetime.now()
         tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
-                                                  tile_service.find_tile_by_id(tile_id))[0]
+                                                  tile_service.find_tile_by_id(tile_id, ds=primary_ds))[0]
         print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id))
     except IndexError:
         # This should only happen if all measurements in a tile become masked after applying the bounding polygon
diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
index 4499773..e033467 100644
--- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
@@ -362,6 +362,9 @@ class NexusCalcSparkHandler(NexusCalcHandler):
             SparkAccumulatorMetricsField(key='solr',
                                          description='Cumulative time to fetch data from Solr',
                                          accumulator=self._sc.accumulator(0)),
+            SparkAccumulatorMetricsField(key='backend',
+                                         description='Cumulative time to fetch data from external backend(s)',
+                                         accumulator=self._sc.accumulator(0)),
             SparkAccumulatorMetricsField(key='calculation',
                                          description='Cumulative time to do calculations',
                                          accumulator=self._sc.accumulator(0)),
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 90ae14d..804d3ec 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -483,8 +483,9 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates,
                                             timestamps[0],
                                             timestamps[-1],
                                             rows=5000,
-                                            metrics_callback=metrics_callback)
-    
+                                            metrics_callback=metrics_callback,
+                                            distinct=True)
+
     calculation_start = datetime.now()
 
     tile_dict = {}
diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini
index 8584975..a9e3dda 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -29,4 +29,4 @@ static_enabled=true
 static_dir=static
 
 [modules]
-module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms
\ No newline at end of file
+module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms,webservice.management
\ No newline at end of file
diff --git a/analysis/webservice/management/Datasets.py b/analysis/webservice/management/Datasets.py
new file mode 100644
index 0000000..40b267f
--- /dev/null
+++ b/analysis/webservice/management/Datasets.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from yaml import load
+import json
+from webservice.NexusHandler import nexus_handler
+from nexustiles.nexustiles import NexusTileService
+from webservice.webmodel import NexusRequestObject, NexusProcessingException
+
+from schema import Schema, Or, SchemaError
+from schema import Optional as Opt
+
+from urllib.parse import urlparse
+try:
+    from yaml import CLoader as Loader
+except ImportError:
+    from yaml import Loader
+
+
+CONFIG_SCHEMA = Schema({
+    Or('variable', 'variables'): Or(str, [str]),
+    'coords': {
+        'latitude': str,
+        'longitude': str,
+        'time': str,
+        Opt('depth'): str
+    },
+    Opt('aws'): {
+        Opt('accessKeyID'): str,
+        Opt('secretAccessKey'): str,
+        'public': bool,
+        Opt('region'): str
+    }
+})
+
+
+class DatasetManagement:
+    @classmethod
+    def validate(cls):
+        pass
+
+    @staticmethod
+    def parse_config(request: NexusRequestObject):
+        content_type = request.get_headers()['Content-Type']
+
+        if content_type in ['application/json', 'application/x-json']:
+            config_dict = json.loads(request.get_request_body())
+        elif content_type == 'application/yaml':
+            config_dict = load(request.get_request_body(), Loader=Loader)
+        else:
+            raise NexusProcessingException(reason='Invalid Content-Type header', code=400)
+
+        try:
+            CONFIG_SCHEMA.validate(config_dict)
+
+            if 'aws' in config_dict:
+                if not config_dict['aws']['public']:
+                    if 'accessKeyID' not in config_dict['aws'] or 'secretAccessKey' not in config_dict['aws']:
+                        raise NexusProcessingException(
+                            reason='Must provide AWS creds for non-public bucket',
+                            code=400
+                        )
+        except SchemaError as e:
+            raise NexusProcessingException(
+                reason=str(e),
+                code=400
+            )
+
+        return config_dict
+
+
+class Response:
+    def __init__(self, response):
+        self.response = response if response is not None else {}
+
+    def toJson(self):
+        return json.dumps(self.response)
+
+
+@nexus_handler
+class DatasetAdd(DatasetManagement):
+    name = 'Add dataset'
+    path = '/datasets/add'
+    description = "Add new Zarr dataset to running SDAP instance"
+    params = {
+        "name": {
+            "name": "Dataset name",
+            "type": "string",
+            "description": "Name of new dataset to add"
+        },
+        "path": {
+            "name": "Path or URL",
+            "type": "string",
+            "description": "Path/URL of Zarr group"
+        },
+        "body": {
+            "name": "Request body",
+            "type": "application/json OR application/yaml",
+            "description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))"
+        }
+    }
+
+    def __init__(self, **args):
+        pass
+
+    def calc(self, request: NexusRequestObject, **args):
+        try:
+            config = DatasetManagement.parse_config(request)
+        except Exception as e:
+            raise NexusProcessingException(
+                reason=repr(e),
+                code=400
+            )
+
+        name = request.get_argument('name')
+
+        if name is None:
+            raise NexusProcessingException(
+                reason='Name argument must be provided',
+                code=400
+            )
+
+        path = request.get_argument('path')
+
+        if path is None:
+            raise NexusProcessingException(
+                reason='Path argument must be provided',
+                code=400
+            )
+
+        try:
+            if urlparse(path).scheme not in ['file','','s3']:
+                raise NexusProcessingException(
+                    reason='Dataset URL must be for a local file or S3 URL',
+                    code=400
+                )
+        except ValueError:
+            raise NexusProcessingException(
+                reason='Could not parse path URL', code=400
+            )
+
+        try:
+            NexusTileService.user_ds_add(name, path, config)
+        except Exception as e:
+            raise NexusProcessingException(
+                reason=repr(e),
+                code=500
+            )
+
+
+@nexus_handler
+class DatasetUpdate(DatasetManagement):
+    name = 'Update dynamically added dataset'
+    path = '/datasets/update'
+    description = "Update Zarr dataset in running SDAP instance"
+    params = {
+        "name": {
+            "name": "Dataset name",
+            "type": "string",
+            "description": "Name of dataset to update"
+        },
+        "body": {
+            "name": "Request body",
+            "type": "application/json OR application/yaml",
+            "description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))"
+        }
+    }
+
+    def __init__(self, **args):
+        pass
+
+    def calc(self, request: NexusRequestObject, **args):
+        try:
+            config = DatasetManagement.parse_config(request)
+        except Exception as e:
+            raise NexusProcessingException(
+                reason=repr(e),
+                code=400
+            )
+
+        name = request.get_argument('name')
+
+        if name is None:
+            raise NexusProcessingException(
+                reason='Name argument must be provided',
+                code=400
+            )
+
+        try:
+            return Response(NexusTileService.user_ds_update(name, config))
+        except Exception as e:
+            raise NexusProcessingException(
+                reason=repr(e),
+                code=500
+            )
+
+
+@nexus_handler
+class DatasetDelete(DatasetManagement):
+    name = 'Remove dataset'
+    path = '/datasets/remove'
+    description = "Remove Zarr dataset from running SDAP instance"
+    params = {
+        "name": {
+            "name": "Dataset name",
+            "type": "string",
+            "description": "Name of dataset to remove"
+        }
+    }
+
+    def __init__(self, **args):
+        pass
+
+    def calc(self, request: NexusRequestObject, **args):
+        name = request.get_argument('name')
+
+        if name is None:
+            raise NexusProcessingException(
+                reason='Name argument must be provided',
+                code=400
+            )
+
+        try:
+            return Response(NexusTileService.user_ds_delete(name))
+        except Exception as e:
+            raise NexusProcessingException(
+                reason=repr(e),
+                code=500
+            )
+
diff --git a/data-access/nexustiles/dao/__init__.py b/analysis/webservice/management/__init__.py
similarity index 93%
copy from data-access/nexustiles/dao/__init__.py
copy to analysis/webservice/management/__init__.py
index 6acb5d1..7c9f5ef 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/analysis/webservice/management/__init__.py
@@ -12,3 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from webservice.management.Datasets import DatasetAdd
\ No newline at end of file
diff --git a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
index afe7d69..0179858 100644
--- a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
@@ -53,7 +53,7 @@ class NexusAppBuilder:
         NexusHandler.executeInitializers(algorithm_config)
 
         self.log.info("Initializing request ThreadPool to %s" % max_request_threads)
-        tile_service_factory = partial(NexusTileService, False, False, algorithm_config)
+        tile_service_factory = partial(NexusTileService, algorithm_config)
         handler_args_builder = HandlerArgsBuilder(
             max_request_threads,
             tile_service_factory,
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 95bddf4..7a55967 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -75,6 +75,35 @@ class NexusRequestHandler(tornado.web.RequestHandler):
         except Exception as e:
             self.async_onerror_callback(str(e), 500)
 
+    @tornado.gen.coroutine
+    def post(self):
+        self.logger.info("Received %s" % self._request_summary())
+
+        request = NexusRequestObject(self)
+
+        # create NexusCalcHandler which will process the request
+        instance = self.__clazz(**self._clazz_init_args)
+
+        try:
+            # process the request asynchronously on a different thread,
+            # the current tornado handler is still available to get other user requests
+            results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+
+            if results:
+                try:
+                    self.set_status(results.status_code)
+                except AttributeError:
+                    pass
+
+                renderer = NexusRendererFactory.get_renderer(request)
+                renderer.render(self, results)
+
+        except NexusProcessingException as e:
+            self.async_onerror_callback(e.reason, e.code)
+
+        except Exception as e:
+            self.async_onerror_callback(str(e), 500)
+
     def async_onerror_callback(self, reason, code=500):
         self.logger.error("Error processing request", exc_info=True)
 
diff --git a/analysis/webservice/webmodel/NexusRequestObject.py b/analysis/webservice/webmodel/NexusRequestObject.py
index bbd2828..1896236 100644
--- a/analysis/webservice/webmodel/NexusRequestObject.py
+++ b/analysis/webservice/webmodel/NexusRequestObject.py
@@ -35,6 +35,12 @@ class NexusRequestObject(StatsComputeOptions):
         self.requestHandler = reqHandler
         StatsComputeOptions.__init__(self)
 
+    def get_headers(self):
+        return self.requestHandler.request.headers
+
+    def get_request_body(self):
+        return self.requestHandler.request.body
+
     def get_argument(self, name, default=None):
         return self.requestHandler.get_argument(name, default=default)
 
diff --git a/data-access/nexustiles/AbstractTileService.py b/data-access/nexustiles/AbstractTileService.py
new file mode 100644
index 0000000..2046778
--- /dev/null
+++ b/data-access/nexustiles/AbstractTileService.py
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from functools import reduce
+
+import numpy as np
+import numpy.ma as ma
+
+
+class AbstractTileService(ABC):
+    # @staticmethod
+    # @abstractmethod
+    # def open_dataset(dataset_s, **kwargs):
+    #     pass
+
+    # @abstractmethod
+    # def try_connect(self) -> bool:
+    #     raise NotImplementedError()
+
+    def __init__(self, dataset_name):
+        self._name = dataset_name
+
+    @abstractmethod
+    def get_dataseries_list(self, simple=False):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tile_by_id(self, tile_id, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+                               metrics_callback=None, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
+        """
+        Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
+        polygon and the closest day of year.
+
+        For example:
+            given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32
+            search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc)
+
+        Valid matches:
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30
+
+        Invalid matches:
+            minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists
+
+        :param bounding_polygon: The exact bounding polygon of tiles to search for
+        :param ds: The dataset name being searched
+        :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned
+        :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
+        # Find tiles that fall in the given box in the Solr index
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
+        # Find tiles that fall within the polygon in the Solr index
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
+        """
+        Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
+        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+        :param ds: The dataset name to search
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :return: A list of tiles
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
+        """
+        The method will return tiles with the exact given bounds within the time range. It differs from
+        find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to
+        doing a polygon intersection with the given bounds.
+
+        :param bounds: (minx, miny, maxx, maxy) bounds to search for
+        :param ds: Dataset name to search
+        :param start_time: Start time to search (seconds since epoch)
+        :param end_time: End time to search (seconds since epoch)
+        :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
+        :return:
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_min_max_time_by_granule(self, ds, granule_name):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_dataset_overall_stats(self, ds):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_bounding_box(self, tile_ids):
+        """
+        Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids.
+        :param tile_ids: List of tile ids
+        :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_min_time(self, tile_ids, ds=None):
+        """
+        Get the minimum tile date from the list of tile ids
+        :param tile_ids: List of tile ids
+        :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+        :return: long time in seconds since epoch
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_max_time(self, tile_ids, ds=None):
+        """
+        Get the maximum tile date from the list of tile ids
+        :param tile_ids: List of tile ids
+        :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+        :return: long time in seconds since epoch
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
+        """
+        Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range.
+        :param bounding_polygon: The bounding polygon of tiles to search for
+        :param ds: The dataset name to search
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
+        """
+        Return number of tiles that match search criteria.
+        :param ds: The dataset name to search
+        :param bounding_polygon: The polygon to search for tiles
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+        :return: number of tiles that match search criteria
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def fetch_data_for_tiles(self, *tiles):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def _metadata_store_docs_to_tiles(self, *store_docs):
+        raise NotImplementedError()
+
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/__init__.py
similarity index 99%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/__init__.py
index 6acb5d1..8afd240 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/backends/__init__.py
@@ -12,3 +12,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/nexusproto/__init__.py
similarity index 99%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/nexusproto/__init__.py
index 6acb5d1..8afd240 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/backends/nexusproto/__init__.py
@@ -12,3 +12,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/backends/nexusproto/backend.py
similarity index 91%
copy from data-access/nexustiles/nexustiles.py
copy to data-access/nexustiles/backends/nexusproto/backend.py
index a3aa61e..690b109 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/backends/nexusproto/backend.py
@@ -18,7 +18,7 @@ import logging
 import sys
 import json
 from datetime import datetime
-from functools import wraps, reduce
+from functools import reduce
 
 import numpy as np
 import numpy.ma as ma
@@ -32,59 +32,23 @@ from .dao import S3Proxy
 from .dao import SolrProxy
 from .dao import ElasticsearchProxy
 
-from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
+from nexustiles.model.nexusmodel import Tile, BBox, TileStats, TileVariable
+from nexustiles.exception import NexusTileServiceException
+from nexustiles.AbstractTileService import AbstractTileService
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
-logger = logging.getLogger("testing")
+logger = logging.getLogger(__name__)
 
 
-def tile_data(default_fetch=True):
-    def tile_data_decorator(func):
-        @wraps(func)
-        def fetch_data_for_func(*args, **kwargs):
-            metadatastore_start = datetime.now()
-            metadatastore_docs = func(*args, **kwargs)
-            metadatastore_duration = (datetime.now() - metadatastore_start).total_seconds()
-            tiles = args[0]._metadata_store_docs_to_tiles(*metadatastore_docs)
-
-            cassandra_duration = 0
-            if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch):
-                if len(tiles) > 0:
-                    cassandra_start = datetime.now()
-                    args[0].fetch_data_for_tiles(*tiles)
-                    cassandra_duration += (datetime.now() - cassandra_start).total_seconds()
-
-            if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None:
-                try:
-                    kwargs['metrics_callback'](cassandra=cassandra_duration,
-                                               metadatastore=metadatastore_duration,
-                                               num_tiles=len(tiles))
-                except Exception as e:
-                    logger.error("Metrics callback '{}'raised an exception. Will continue anyway. " +
-                                 "The exception was: {}".format(kwargs['metrics_callback'], e))
-            return tiles
-
-        return fetch_data_for_func
-
-    return tile_data_decorator
-
-
-class NexusTileServiceException(Exception):
-    pass
-
-
-class NexusTileService(object):
+class NexusprotoTileService(AbstractTileService):
     def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
+        AbstractTileService.__init__(self, None)
         self._datastore = None
         self._metadatastore = None
 
         self._config = configparser.RawConfigParser()
-        self._config.read(NexusTileService._get_config_files('config/datastores.ini'))
+        self._config.read(NexusprotoTileService._get_config_files('config/datastores.ini'))
 
         if config:
             self.override_config(config)
@@ -120,11 +84,9 @@ class NexusTileService(object):
         else:
             return self._metadatastore.get_data_series_list()
 
-    @tile_data()
     def find_tile_by_id(self, tile_id, **kwargs):
         return self._metadatastore.find_tile_by_id(tile_id)
 
-    @tile_data()
     def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
         return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
 
@@ -139,7 +101,6 @@ class NexusTileService(object):
             metrics_callback(solr=duration)
         return result
 
-    @tile_data()
     def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
         """
         Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
@@ -171,18 +132,15 @@ class NexusTileService(object):
 
         return tile
 
-    @tile_data()
     def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
         return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
                                                                  rows=5000,
                                                                  **kwargs)
 
-    @tile_data()
     def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
         return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
                                                                      **kwargs)
 
-    @tile_data()
     def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
         # Find tiles that fall in the given box in the Solr index
         if type(start_time) is datetime:
@@ -192,7 +150,6 @@ class NexusTileService(object):
         return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
                                                                      end_time, **kwargs)
 
-    @tile_data()
     def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
         # Find tiles that fall within the polygon in the Solr index
         if 'sort' in list(kwargs.keys()):
@@ -203,7 +160,6 @@ class NexusTileService(object):
                                                                               **kwargs)
         return tiles
 
-    @tile_data()
     def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
         """
         Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
@@ -232,7 +188,6 @@ class NexusTileService(object):
 
         return tiles
 
-    @tile_data()
     def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
         """
         The method will return tiles with the exact given bounds within the time range. It differs from
@@ -251,7 +206,6 @@ class NexusTileService(object):
                                                                end_time)
         return tiles
 
-    @tile_data()
     def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
         return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
                                                                    rows=5000,
@@ -315,6 +269,9 @@ class NexusTileService(object):
         """
         tiles = self.find_tiles_by_id(tile_ids, fl=['tile_min_lat', 'tile_max_lat', 'tile_min_lon', 'tile_max_lon'],
                                       fetch_data=False, rows=len(tile_ids))
+
+        tiles = self._metadata_store_docs_to_tiles(*tiles)
+
         polys = []
         for tile in tiles:
             polys.append(box(tile.bbox.min_lon, tile.bbox.min_lat, tile.bbox.max_lon, tile.bbox.max_lat))
@@ -556,8 +513,8 @@ class NexusTileService(object):
                     var_names = [store_doc['tile_var_name_s']]
 
                 standard_name = store_doc.get(
-                        'tile_standard_name_s',
-                        json.dumps([None] * len(var_names))
+                    'tile_standard_name_s',
+                    json.dumps([None] * len(var_names))
                 )
                 if '[' in standard_name:
                     standard_names = json.loads(standard_name)
@@ -573,7 +530,6 @@ class NexusTileService(object):
             except KeyError:
                 pass
 
-
             if 'tile_var_name_ss' in store_doc:
                 tile.variables = []
                 for var_name in store_doc['tile_var_name_ss']:
diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/backends/nexusproto/config/datastores.ini.default
similarity index 100%
rename from data-access/nexustiles/config/datastores.ini.default
rename to data-access/nexustiles/backends/nexusproto/config/datastores.ini.default
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
similarity index 100%
rename from data-access/nexustiles/dao/CassandraProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
diff --git a/data-access/nexustiles/dao/DynamoProxy.py b/data-access/nexustiles/backends/nexusproto/dao/DynamoProxy.py
similarity index 100%
rename from data-access/nexustiles/dao/DynamoProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/DynamoProxy.py
diff --git a/data-access/nexustiles/dao/ElasticsearchProxy.py b/data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py
similarity index 100%
rename from data-access/nexustiles/dao/ElasticsearchProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py
diff --git a/data-access/nexustiles/dao/S3Proxy.py b/data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py
similarity index 100%
rename from data-access/nexustiles/dao/S3Proxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py
diff --git a/data-access/nexustiles/dao/SolrProxy.py b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
similarity index 99%
rename from data-access/nexustiles/dao/SolrProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
index a072c6b..e466ad4 100644
--- a/data-access/nexustiles/dao/SolrProxy.py
+++ b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
@@ -217,7 +217,8 @@ class SolrProxy(object):
             l.append({
                 "shortName": g,
                 "title": g,
-                "tileCount": v
+                "tileCount": v,
+                "type": 'nexusproto'
             })
         l = sorted(l, key=lambda entry: entry["title"])
         return l
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/nexusproto/dao/__init__.py
similarity index 100%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/nexusproto/dao/__init__.py
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/zarr/__init__.py
similarity index 99%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/zarr/__init__.py
index 6acb5d1..8afd240 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/backends/zarr/__init__.py
@@ -12,3 +12,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
diff --git a/data-access/nexustiles/backends/zarr/backend.py b/data-access/nexustiles/backends/zarr/backend.py
new file mode 100644
index 0000000..e1d0a0c
--- /dev/null
+++ b/data-access/nexustiles/backends/zarr/backend.py
@@ -0,0 +1,532 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import sys
+from datetime import datetime
+from urllib.parse import urlparse
+
+import numpy as np
+import numpy.ma as ma
+import s3fs
+import xarray as xr
+from nexustiles.AbstractTileService import AbstractTileService
+from nexustiles.exception import NexusTileServiceException
+from nexustiles.model.nexusmodel import Tile, BBox, TileVariable
+from pytz import timezone
+from shapely.geometry import MultiPolygon, box
+from yarl import URL
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger(__name__)
+
+
+class ZarrBackend(AbstractTileService):
+    def __init__(self, dataset_name, path, config=None):
+        AbstractTileService.__init__(self, dataset_name)
+        self.__config = config if config is not None else {}
+
+        logger.info(f'Opening zarr backend at {path} for dataset {self._name}')
+
+        url = urlparse(path)
+
+        self.__url = path
+
+        self.__store_type = url.scheme
+        self.__host = url.netloc
+        self.__path = url.path
+
+        if 'variable' in config:
+            data_vars = config['variable']
+        elif 'variables' in config:
+            data_vars = config['variables']
+        else:
+            raise KeyError('Data variables not provided in config')
+
+        if isinstance(data_vars, str):
+            self.__variables = [data_vars]
+        elif isinstance(data_vars, list):
+            self.__variables = data_vars
+        else:
+            raise TypeError(f'Improper type for variables config: {type(data_vars)}')
+
+        self.__variables = [v.strip('\"\'') for v in self.__variables]
+
+        self.__longitude = config['coords']['longitude']
+        self.__latitude = config['coords']['latitude']
+        self.__time = config['coords']['time']
+
+        self.__depth = config['coords'].get('depth')
+
+        if self.__store_type in ['', 'file']:
+            store = self.__path
+        elif self.__store_type == 's3':
+            try:
+                aws_cfg = self.__config['aws']
+
+                if aws_cfg['public']:
+                    # region = aws_cfg.get('region', 'us-west-2')
+                    # store = f'https://{self.__host}.s3.{region}.amazonaws.com{self.__path}'
+                    s3 = s3fs.S3FileSystem(True)
+                    store = s3fs.S3Map(root=path, s3=s3, check=False)
+                else:
+                    s3 = s3fs.S3FileSystem(False, key=aws_cfg['accessKeyID'], secret=aws_cfg['secretAccessKey'])
+                    store = s3fs.S3Map(root=path, s3=s3, check=False)
+            except Exception as e:
+                logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}')
+                raise NexusTileServiceException(f'Cannot open S3 dataset ({e})')
+        else:
+            raise ValueError(self.__store_type)
+
+        try:
+            self.__ds: xr.Dataset = xr.open_zarr(store, consolidated=True)
+        except Exception as e:
+            logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}')
+            raise NexusTileServiceException(f'Cannot open dataset ({e})')
+
+        lats = self.__ds[self.__latitude].to_numpy()
+        delta = lats[1] - lats[0]
+
+        if delta < 0:
+            logger.warning(f'Latitude coordinate for {self._name} is in descending order. Flipping it to ascending')
+            self.__ds = self.__ds.isel({self.__latitude: slice(None, None, -1)})
+
+    def get_dataseries_list(self, simple=False):
+        ds = {
+            "shortName": self._name,
+            "title": self._name,
+            "type": "zarr"
+        }
+
+        if not simple:
+            min_date = self.get_min_time([])
+            max_date = self.get_max_time([])
+            ds['start'] = min_date
+            ds['end'] = max_date
+            ds['iso_start'] = datetime.utcfromtimestamp(min_date).strftime(ISO_8601)
+            ds['iso_end'] = datetime.utcfromtimestamp(max_date).strftime(ISO_8601)
+
+            ds['metadata'] = dict(self.__ds.attrs)
+
+        return [ds]
+
+    def find_tile_by_id(self, tile_id, **kwargs):
+        return [tile_id]
+
+    def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
+        return tile_ids
+
+    def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+                               metrics_callback=None, **kwargs):
+        start = datetime.now()
+
+        if not isinstance(start_time, datetime):
+            start_time = datetime.utcfromtimestamp(start_time)
+
+        if not isinstance(end_time, datetime):
+            end_time = datetime.utcfromtimestamp(end_time)
+
+        sel = {
+            self.__latitude: slice(min_lat, max_lat),
+            self.__longitude: slice(min_lon, max_lon),
+            self.__time: slice(start_time, end_time)
+        }
+
+        times = self.__ds.sel(sel)[self.__time].to_numpy()
+
+        if np.issubdtype(times.dtype, np.datetime64):
+            times = ((times - np.datetime64(EPOCH)) / 1e9).astype(int)
+
+        times = sorted(times.tolist())
+
+        if metrics_callback:
+            metrics_callback(backend=(datetime.now() - start).total_seconds())
+
+        return times
+
+    def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
+        """
+        Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
+        polygon and the closest day of year.
+
+        For example:
+            given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32
+            search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc)
+
+        Valid matches:
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30
+
+        Invalid matches:
+            minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32
+            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists
+
+        :param bounding_polygon: The exact bounding polygon of tiles to search for
+        :param ds: The dataset name being searched
+        :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned
+        :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
+        """
+
+        times = self.__ds[self.__time].to_numpy()
+
+        to_doy = lambda dt: datetime.utcfromtimestamp(int(dt)).timetuple().tm_yday
+
+        vfunc = np.vectorize(to_doy)
+        days_of_year = vfunc(times.astype(datetime) / 1e9)
+
+        try:
+            time = times[np.where(days_of_year <= day_of_year)[0][-1]].astype(datetime) / 1e9
+        except IndexError:
+            raise NexusTileServiceException(reason='No tiles matched')
+
+        min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds
+
+        return self.find_tiles_in_box(
+            min_lat, max_lat, min_lon, max_lon, ds, time, time
+        )
+
+    def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+        return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, dataset, time, time, **kwargs)
+
+    def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
+        return self.find_tiles_in_polygon(bounding_polygon, dataset, time, time, **kwargs)
+
+    def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
+        if type(start_time) is datetime:
+            start_time = (start_time - EPOCH).total_seconds()
+        if type(end_time) is datetime:
+            end_time = (end_time - EPOCH).total_seconds()
+
+        params = {
+            'min_lat': min_lat,
+            'max_lat': max_lat,
+            'min_lon': min_lon,
+            'max_lon': max_lon
+        }
+
+        times = None
+
+        if 0 <= start_time <= end_time:
+            if kwargs.get('distinct', True):
+                times_asc = self.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time)
+                times = [(t, t) for t in times_asc]
+            else:
+                times = [(start_time, end_time)]
+
+        if 'depth' in kwargs:
+            params['depth'] = kwargs['depth']
+        elif 'min_depth' in kwargs or 'max_depth' in kwargs:
+            params['min_depth'] = kwargs.get('min_depth')
+            params['max_depth'] = kwargs.get('max_depth')
+
+        if times:
+            return [ZarrBackend.__to_url(self._name, min_time=t[0], max_time=t[1], **params) for t in times]
+        else:
+            return [ZarrBackend.__to_url(self._name, **params)]
+
+    def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=None, end_time=None, **kwargs):
+        # Find tiles that fall within the polygon in the Solr index
+        bounds = bounding_polygon.bounds
+
+        min_lon = bounds[0]
+        min_lat = bounds[1]
+        max_lon = bounds[2]
+        max_lat = bounds[3]
+
+        return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs)
+
+    def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
+        """
+        Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
+        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+        :param ds: The dataset name to search
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :return: A list of tiles
+        """
+        raise NotImplementedError()
+
+    def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
+        """
+        The method will return tiles with the exact given bounds within the time range. It differs from
+        find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to
+        doing a polygon intersection with the given bounds.
+
+        :param bounds: (minx, miny, maxx, maxy) bounds to search for
+        :param ds: Dataset name to search
+        :param start_time: Start time to search (seconds since epoch)
+        :param end_time: End time to search (seconds since epoch)
+        :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
+        :return:
+        """
+        min_lon = bounds[0]
+        min_lat = bounds[1]
+        max_lon = bounds[2]
+        max_lat = bounds[3]
+
+        return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs)
+
+    def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+        # Due to the precise nature of gridded Zarr's subsetting, it doesn't make sense to have a boundary region like
+        # this
+        return []
+
+    def get_min_max_time_by_granule(self, ds, granule_name):
+        raise NotImplementedError()
+
+    def get_dataset_overall_stats(self, ds):
+        raise NotImplementedError()
+
+    def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+        raise NotImplementedError()
+
+    def get_bounding_box(self, tile_ids):
+        """
+        Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids.
+        :param tile_ids: List of tile ids
+        :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles
+        """
+
+        bounds = [
+            (
+                float(URL(u).query['min_lon']),
+                float(URL(u).query['min_lat']),
+                float(URL(u).query['max_lon']),
+                float(URL(u).query['max_lat'])
+            )
+            for u in tile_ids
+        ]
+
+        poly = MultiPolygon([box(*b) for b in bounds])
+
+        return box(*poly.bounds)
+
+    def __get_ds_min_max_date(self):
+        min_date = self.__ds[self.__time].min().to_numpy()
+        max_date = self.__ds[self.__time].max().to_numpy()
+
+        if np.issubdtype(min_date.dtype, np.datetime64):
+            min_date = ((min_date - np.datetime64(EPOCH)) / 1e9).astype(int).item()
+
+        if np.issubdtype(max_date.dtype, np.datetime64):
+            max_date = ((max_date - np.datetime64(EPOCH)) / 1e9).astype(int).item()
+
+        return min_date, max_date
+
+    def get_min_time(self, tile_ids, ds=None):
+        """
+        Get the minimum tile date from the list of tile ids
+        :param tile_ids: List of tile ids
+        :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+        :return: long time in seconds since epoch
+        """
+        times = list(filter(lambda x: x is not None, [int(URL(tid).query['min_time']) for tid in tile_ids]))
+
+        if len(times) == 0:
+            min_date, max_date = self.__get_ds_min_max_date()
+            return min_date
+        else:
+            return min(times)
+
+    def get_max_time(self, tile_ids, ds=None):
+        """
+        Get the maximum tile date from the list of tile ids
+        :param tile_ids: List of tile ids
+        :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+        :return: long time in seconds since epoch
+        """
+        times = list(filter(lambda x: x is not None, [int(URL(tid).query['max_time']) for tid in tile_ids]))
+
+        if len(tile_ids) == 0:
+            min_date, max_date = self.__get_ds_min_max_date()
+            return max_date
+        else:
+            return max(times)
+
+    def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
+        """
+        Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range.
+        :param bounding_polygon: The bounding polygon of tiles to search for
+        :param ds: The dataset name to search
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon
+        """
+        raise NotImplementedError()
+
+    def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
+        """
+        Return number of tiles that match search criteria.
+        :param ds: The dataset name to search
+        :param bounding_polygon: The polygon to search for tiles
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+        :return: number of tiles that match search criteria
+        """
+        raise NotImplementedError()
+
+    def fetch_data_for_tiles(self, *tiles):
+        for tile in tiles:
+            self.__fetch_data_for_tile(tile)
+
+        return tiles
+
+    def __fetch_data_for_tile(self, tile: Tile):
+        bbox: BBox = tile.bbox
+
+        min_lat = None
+        min_lon = None
+        max_lat = None
+        max_lon = None
+
+        min_time = tile.min_time
+        max_time = tile.max_time
+
+        # if min_time:
+        #     min_time = datetime.utcfromtimestamp(min_time)
+        #
+        # if max_time:
+        #     max_time = datetime.utcfromtimestamp(max_time)
+
+        if bbox:
+            min_lat = bbox.min_lat
+            min_lon = bbox.min_lon
+            max_lat = bbox.max_lat
+            max_lon = bbox.max_lon
+
+        sel_g = {
+            self.__latitude: slice(min_lat, max_lat),
+            self.__longitude: slice(min_lon, max_lon),
+        }
+
+        sel_t = {}
+
+        if min_time is None and max_time is None:
+            sel_t = None
+            method = None
+        elif min_time == max_time:
+            sel_t[self.__time] = [min_time]  # List, otherwise self.__time dim will be dropped
+            method = 'nearest'
+        else:
+            sel_t[self.__time] = slice(min_time, max_time)
+            method = None
+
+        tile.variables = [
+            TileVariable(v, v) for v in self.__variables
+        ]
+
+        matched = self.__ds.sel(sel_g) #.sel(sel_t, method=method)
+
+        if sel_t is not None:
+            matched = matched.sel(sel_t, method=method)
+
+        tile.latitudes = ma.masked_invalid(matched[self.__latitude].to_numpy())
+        tile.longitudes = ma.masked_invalid(matched[self.__longitude].to_numpy())
+
+        times = matched[self.__time].to_numpy()
+
+        if np.issubdtype(times.dtype, np.datetime64):
+            times = ((times - np.datetime64(EPOCH)) / 1e9).astype(int)
+
+        tile.times = ma.masked_invalid(times)
+
+        var_data = [matched[var].to_numpy() for var in self.__variables]
+
+        if len(self.__variables) > 1:
+            tile.data = ma.masked_invalid(var_data)
+            tile.is_multi = True
+        else:
+            tile.data = ma.masked_invalid(var_data[0])
+            tile.is_multi = False
+
+
+    def _metadata_store_docs_to_tiles(self, *store_docs):
+        return [ZarrBackend.__nts_url_to_tile(d) for d in store_docs]
+
+    @staticmethod
+    def __nts_url_to_tile(nts_url):
+        tile = Tile()
+
+        url = URL(nts_url)
+
+        tile.tile_id = nts_url
+
+        try:
+            min_lat = float(url.query['min_lat'])
+            min_lon = float(url.query['min_lon'])
+            max_lat = float(url.query['max_lat'])
+            max_lon = float(url.query['max_lon'])
+
+            tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon)
+        except KeyError:
+            pass
+
+        tile.dataset = url.path
+        tile.dataset_id = url.path
+
+        try:
+            # tile.min_time = int(url.query['min_time'])
+            tile.min_time = datetime.utcfromtimestamp(int(url.query['min_time']))
+        except KeyError:
+            pass
+
+        try:
+            # tile.max_time = int(url.query['max_time'])
+            tile.max_time = datetime.utcfromtimestamp(int(url.query['max_time']))
+        except KeyError:
+            pass
+
+        tile.meta_data = {}
+
+        return tile
+
+    @staticmethod
+    def __to_url(dataset, **kwargs):
+        if 'dataset' in kwargs:
+            del kwargs['dataset']
+
+        if 'ds' in kwargs:
+            del kwargs['ds']
+
+        params = {}
+
+        # If any params are numpy dtypes, extract them to base python types
+        for kw in kwargs:
+            v = kwargs[kw]
+
+            if v is None:
+                continue
+
+            if isinstance(v, np.generic):
+                v = v.item()
+
+            params[kw] = v
+
+        return str(URL.build(
+            scheme='nts',
+            host='',
+            path=dataset,
+            query=params
+        ))
+
+
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/config/datasets.ini.default
similarity index 93%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/config/datasets.ini.default
index 6acb5d1..9f586cf 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/config/datasets.ini.default
@@ -12,3 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+[solr]
+host=http://localhost:8983
+core=nexusdatasets
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/exception.py
similarity index 86%
rename from data-access/nexustiles/dao/__init__.py
rename to data-access/nexustiles/exception.py
index 6acb5d1..d6ed2c6 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/exception.py
@@ -12,3 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+class NexusTileServiceException(Exception):
+    def __init__(self, reason):
+        Exception.__init__(self, reason)
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index a3aa61e..b4fd6bb 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -14,25 +14,33 @@
 # limitations under the License.
 
 import configparser
+import json
 import logging
 import sys
-import json
+import threading
 from datetime import datetime
-from functools import wraps, reduce
+from functools import reduce, wraps
+from time import sleep
+from typing import Dict, Union
 
 import numpy as np
 import numpy.ma as ma
 import pkg_resources
+import pysolr
 from pytz import timezone, UTC
-from shapely.geometry import MultiPolygon, box
+from shapely.geometry import box
+from webservice.webmodel import DatasetNotFoundException, NexusProcessingException
+from webservice.NexusHandler import nexus_initializer
+from yarl import URL
+
+from .AbstractTileService import AbstractTileService
+from .backends.nexusproto.backend import NexusprotoTileService
+from .backends.zarr.backend import ZarrBackend
+from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
 
-from .dao import CassandraProxy
-from .dao import DynamoProxy
-from .dao import S3Proxy
-from .dao import SolrProxy
-from .dao import ElasticsearchProxy
+from .exception import NexusTileServiceException
 
-from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
+from requests.structures import CaseInsensitiveDict
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 
@@ -40,7 +48,7 @@ logging.basicConfig(
     level=logging.INFO,
     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
     datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
-logger = logging.getLogger("testing")
+logger = logging.getLogger("nexus-tile-svc")
 
 
 def tile_data(default_fetch=True):
@@ -50,13 +58,27 @@ def tile_data(default_fetch=True):
             metadatastore_start = datetime.now()
             metadatastore_docs = func(*args, **kwargs)
             metadatastore_duration = (datetime.now() - metadatastore_start).total_seconds()
-            tiles = args[0]._metadata_store_docs_to_tiles(*metadatastore_docs)
+
+            # Try to determine source dataset to route calls to proper backend
+            guessed_dataset = None
+
+            if 'ds' in kwargs:
+                guessed_dataset = kwargs['ds']
+            elif 'dataset' in kwargs:
+                guessed_dataset = kwargs['dataset']
+            else:
+                for arg in args:
+                    if isinstance(arg, str) and arg in NexusTileService.backends:
+                        guessed_dataset = arg
+                        break
+
+            tiles = NexusTileService._get_backend(guessed_dataset)._metadata_store_docs_to_tiles(*metadatastore_docs)
 
             cassandra_duration = 0
             if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch):
                 if len(tiles) > 0:
                     cassandra_start = datetime.now()
-                    args[0].fetch_data_for_tiles(*tiles)
+                    NexusTileService._get_backend(guessed_dataset).fetch_data_for_tiles(*tiles)
                     cassandra_duration += (datetime.now() - cassandra_start).total_seconds()
 
             if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None:
@@ -74,38 +96,280 @@ def tile_data(default_fetch=True):
     return tile_data_decorator
 
 
-class NexusTileServiceException(Exception):
-    pass
+def catch_not_implemented(func):
+    def wrapper(*args, **kwargs):
+        try:
+            return func(*args, **kwargs)
+        except NotImplementedError:
+            raise NexusTileServiceException('Action unsupported by backend')
+
+    return wrapper
 
 
-class NexusTileService(object):
-    def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
-        self._datastore = None
-        self._metadatastore = None
+SOLR_LOCK = threading.Lock()
+DS_LOCK = threading.Lock()
+thread_local = threading.local()
+
+
+@nexus_initializer
+class NTSInitializer:
+    def __init__(self):
+        self._log = logger.getChild('init')
+
+    def init(self, config):
+        self._log.info('*** RUNNING NTS INITIALIZATION ***')
+        NexusTileService(config)
+
+
+class NexusTileService:
+    backends: Dict[Union[None, str], Dict[str, Union[AbstractTileService, bool]]] = {}
+
+    ds_config = None
+
+    __update_thread = None
+
+    @staticmethod
+    def __update_datasets_loop():
+        while True:
+            with DS_LOCK:
+                NexusTileService._update_datasets()
+            sleep(3600)
 
+    def __init__(self, config=None):
         self._config = configparser.RawConfigParser()
-        self._config.read(NexusTileService._get_config_files('config/datastores.ini'))
+        self._config.read(NexusTileService._get_config_files('config/datasets.ini'))
+
+        self._alg_config = config
+
+        if not NexusTileService.backends:
+            NexusTileService.ds_config = configparser.RawConfigParser()
+            NexusTileService.ds_config.read(NexusTileService._get_config_files('config/datasets.ini'))
+
+            default_backend = {"backend": NexusprotoTileService(False, False, config), 'up': True}
+
+            NexusTileService.backends[None] = default_backend
+            NexusTileService.backends['__nexusproto__'] = default_backend
 
         if config:
             self.override_config(config)
 
-        if not skipDatastore:
-            datastore = self._config.get("datastore", "store")
-            if datastore == "cassandra":
-                self._datastore = CassandraProxy.CassandraProxy(self._config)
-            elif datastore == "s3":
-                self._datastore = S3Proxy.S3Proxy(self._config)
-            elif datastore == "dynamo":
-                self._datastore = DynamoProxy.DynamoProxy(self._config)
+        if not NexusTileService.__update_thread:
+            NexusTileService.__update_thread = threading.Thread(
+                target=NexusTileService.__update_datasets_loop,
+                name='dataset_update',
+                daemon=True
+            )
+
+            logger.info('Starting dataset refresh thread')
+
+            NexusTileService.__update_thread.start()
+
+    @staticmethod
+    def _get_backend(dataset_s) -> AbstractTileService:
+        if dataset_s is not None:
+            dataset_s = dataset_s
+
+        with DS_LOCK:
+            if dataset_s not in NexusTileService.backends:
+                logger.warning(f'Dataset {dataset_s} not currently loaded. Checking to see if it was recently'
+                               f'added')
+                NexusTileService._update_datasets()
+                if dataset_s not in NexusTileService.backends:
+                    raise DatasetNotFoundException(reason=f'Dataset {dataset_s} is not currently loaded/ingested')
+
+            b = NexusTileService.backends[dataset_s]
+
+            # if not b['up']:
+            #     success = b['backend'].try_connect()
+            #
+            #     if not success:
+            #         raise NexusProcessingException(reason=f'Dataset {dataset_s} is currently unavailable')
+            #     else:
+            #         NexusTileService.backends[dataset_s]['up'] = True
+
+            return b['backend']
+
+
+    @staticmethod
+    def _get_datasets_store():
+        solr_url = NexusTileService.ds_config.get("solr", "host")
+        solr_core = NexusTileService.ds_config.get("solr", "core")
+        solr_kwargs = {}
+
+        if NexusTileService.ds_config.has_option("solr", "time_out"):
+            solr_kwargs["timeout"] = NexusTileService.ds_config.get("solr", "time_out")
+
+        with SOLR_LOCK:
+            solrcon = getattr(thread_local, 'solrcon', None)
+            if solrcon is None:
+                solr_url = '%s/solr/%s' % (solr_url, solr_core)
+                solrcon = pysolr.Solr(solr_url, **solr_kwargs)
+                thread_local.solrcon = solrcon
+
+            solrcon = solrcon
+
+            return solrcon
+
+    @staticmethod
+    def _update_datasets():
+        update_logger = logging.getLogger("nexus-tile-svc.backends")
+        solrcon = NexusTileService._get_datasets_store()
+
+        update_logger.info('Executing Solr query to check for new datasets')
+
+        present_datasets = {None, '__nexusproto__'}
+        next_cursor_mark = '*'
+
+        added_datasets = 0
+
+        while True:
+            response = solrcon.search('*:*', cursorMark=next_cursor_mark, sort='id asc')
+
+            try:
+                response_cursor_mark = response.nextCursorMark
+            except AttributeError:
+                break
+
+            if response_cursor_mark == next_cursor_mark:
+                break
             else:
-                raise ValueError("Error reading datastore from config file")
+                next_cursor_mark = response_cursor_mark
+
+            for dataset in response.docs:
+                d_id = dataset['dataset_s']
+                store_type = dataset.get('store_type_s', 'nexusproto')
+
+                present_datasets.add(d_id)
+
+                if d_id in NexusTileService.backends:
+                    continue
+                    # is_up = NexusTileService.backends[d_id]['backend'].try_connect()
+
+                added_datasets += 1
+
+                if store_type == 'nexus_proto' or store_type == 'nexusproto':
+                    update_logger.info(f"Detected new nexusproto dataset {d_id}, using default nexusproto backend")
+                    NexusTileService.backends[d_id] = NexusTileService.backends[None]
+                elif store_type == 'zarr':
+                    update_logger.info(f"Detected new zarr dataset {d_id}, opening new zarr backend")
+
+                    ds_config = json.loads(dataset['config'][0])
+                    try:
+                        NexusTileService.backends[d_id] = {
+                            'backend': ZarrBackend(dataset_name=dataset['dataset_s'], **ds_config),
+                            'up': True
+                        }
+                    except NexusTileServiceException:
+                        added_datasets -= 1
+                else:
+                    update_logger.warning(f'Unsupported backend {store_type} for dataset {d_id}')
+                    added_datasets -= 1
+
+        removed_datasets = set(NexusTileService.backends.keys()).difference(present_datasets)
+
+        if len(removed_datasets) > 0:
+            update_logger.info(f'{len(removed_datasets)} old datasets marked for removal')
+
+        for dataset in removed_datasets:
+            update_logger.info(f"Removing dataset {dataset}")
+            del NexusTileService.backends[dataset]
+
+        update_logger.info(f'Finished dataset update: {added_datasets} added, {len(removed_datasets)} removed, '
+                           f'{len(NexusTileService.backends) - 2} total')
+
+    # Update cfg (ie, creds) of dataset
+    @staticmethod
+    def user_ds_update(name, config):
+        solr = NexusTileService._get_datasets_store()
+
+        docs = solr.search(f'dataset_s:{name}').docs
+
+        if len(docs) != 1:
+            raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}')
+
+        ds = docs[0]
+
+        if 'source_s' not in ds or ds['source_s'] == 'collection_config':
+            raise ValueError('Provided dataset is source_s in collection config and cannot be deleted')
+
+        config_dict = json.loads(ds['config'][0])
+
+        config_dict['config'] = config
+
+        solr.delete(id=ds['id'])
+        solr.add([{
+            'id': name,
+            'dataset_s': name,
+            'latest_update_l': int(datetime.now().timestamp()),
+            'store_type_s': ds['store_type_s'],
+            'config': json.dumps(config_dict),
+            'source_s': 'user_added'
+        }])
+        solr.commit()
+
+        logger.info(f'Updated dataset {name} in Solr. Updating backends')
+
+        with DS_LOCK:
+            NexusTileService._update_datasets()
+
+        return {'success': True}
+
+    # Add dataset + backend
+    @staticmethod
+    def user_ds_add(name, path, config, type='zarr'):
+        solr = NexusTileService._get_datasets_store()
+
+        docs = solr.search(f'dataset_s:{name}').docs
+
+        if len(docs) > 0:
+            raise ValueError(f'Dataset {name} already exists')
+
+        config_dict = {
+            'path': path,
+            'config': config
+        }
+
+        solr.add([{
+            'id': name,
+            'dataset_s': name,
+            'latest_update_l': int(datetime.now().timestamp()),
+            'store_type_s': type,
+            'config': json.dumps(config_dict),
+            'source_s': 'user_added'
+        }])
+        solr.commit()
+
+        logger.info(f'Added dataset {name} to Solr. Updating backends')
+
+        with DS_LOCK:
+            NexusTileService._update_datasets()
+
+        return {'success': True}
 
-        if not skipMetadatastore:
-            metadatastore = self._config.get("metadatastore", "store", fallback='solr')
-            if metadatastore == "solr":
-                self._metadatastore = SolrProxy.SolrProxy(self._config)
-            elif metadatastore == "elasticsearch":
-                self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config)
+    # Delete dataset backend (error if it's a hardcoded one)
+    @staticmethod
+    def user_ds_delete(name):
+        solr = NexusTileService._get_datasets_store()
+
+        docs = solr.search(f'dataset_s:{name}').docs
+
+        if len(docs) != 1:
+            raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}')
+
+        ds = docs[0]
+
+        if 'source_s' not in ds or ds['source_s'] == 'collection_config':
+            raise ValueError('Provided dataset is source_s in collection config and cannot be deleted')
+
+        solr.delete(id=ds['id'])
+        solr.commit()
+
+        logger.info(f'Removed dataset {name} from Solr. Updating backends')
+
+        with DS_LOCK:
+            NexusTileService._update_datasets()
+
+        return {'success': True}
 
     def override_config(self, config):
         for section in config.sections():
@@ -113,109 +377,90 @@ class NexusTileService(object):
                 for option in config.options(section):
                     if config.get(section, option) is not None:
                         self._config.set(section, option, config.get(section, option))
+            if NexusTileService.ds_config.has_section(section):  # only override preexisting section, ignores the other
+                for option in config.options(section):
+                    if config.get(section, option) is not None:
+                        NexusTileService.ds_config.set(section, option, config.get(section, option))
 
     def get_dataseries_list(self, simple=False):
-        if simple:
-            return self._metadatastore.get_data_series_list_simple()
-        else:
-            return self._metadatastore.get_data_series_list()
+        datasets = []
+        for backend in set([b['backend'] for b in NexusTileService.backends.values() if b['up']]):
+            datasets.extend(backend.get_dataseries_list(simple))
+
+        return datasets
+
 
     @tile_data()
+    @catch_not_implemented
     def find_tile_by_id(self, tile_id, **kwargs):
-        return self._metadatastore.find_tile_by_id(tile_id)
+        tile = URL(tile_id)
+
+        if tile.scheme == 'nts':
+            return NexusTileService._get_backend(tile.path).find_tile_by_id(tile_id)
+        else:
+            return NexusTileService._get_backend('__nexusproto__').find_tile_by_id(tile_id)
 
     @tile_data()
+    @catch_not_implemented
     def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
-        return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
+        if ds is None:
+            return [self.find_tile_by_id(tid, **kwargs, fetch_data=False) for tid in tile_ids]
+        return NexusTileService._get_backend(ds).find_tiles_by_id(tile_ids, ds=ds, **kwargs)
 
+    @catch_not_implemented
     def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
                                metrics_callback=None, **kwargs):
-        start = datetime.now()
-        result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time,
-                                                            end_time,
-                                                            **kwargs)
-        duration = (datetime.now() - start).total_seconds()
-        if metrics_callback:
-            metrics_callback(solr=duration)
-        return result
+        return NexusTileService._get_backend(dataset).find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon,
+                                                                             dataset, start_time, end_time,
+                                                                             metrics_callback, **kwargs)
 
     @tile_data()
+    @catch_not_implemented
     def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
-        """
-        Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
-        polygon and the closest day of year.
-
-        For example:
-            given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32
-            search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc)
-
-        Valid matches:
-            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32
-            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30
-
-        Invalid matches:
-            minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32
-            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32
-            minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists
-
-        :param bounding_polygon: The exact bounding polygon of tiles to search for
-        :param ds: The dataset name being searched
-        :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned
-        :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
-        """
-        try:
-            tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds,
-                                                                                        day_of_year)
-        except IndexError:
-            raise NexusTileServiceException("No tile found.").with_traceback(sys.exc_info()[2])
-
-        return tile
+        return NexusTileService._get_backend(ds).find_tile_by_polygon_and_most_recent_day_of_year(
+            bounding_polygon, ds, day_of_year, **kwargs
+        )
 
     @tile_data()
+    @catch_not_implemented
     def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
-                                                                 rows=5000,
-                                                                 **kwargs)
+        return NexusTileService._get_backend(dataset).find_all_tiles_in_box_at_time(
+            min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs
+        )
 
     @tile_data()
+    @catch_not_implemented
     def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
-        return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
-                                                                     **kwargs)
+        return NexusTileService._get_backend(dataset).find_all_tiles_in_polygon_at_time(
+            bounding_polygon, dataset, time, **kwargs
+        )
 
     @tile_data()
+    @catch_not_implemented
     def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
         # Find tiles that fall in the given box in the Solr index
         if type(start_time) is datetime:
             start_time = (start_time - EPOCH).total_seconds()
         if type(end_time) is datetime:
             end_time = (end_time - EPOCH).total_seconds()
-        return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
-                                                                     end_time, **kwargs)
+
+        return NexusTileService._get_backend(ds).find_tiles_in_box(
+            min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs
+        )
 
     @tile_data()
+    @catch_not_implemented
     def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
-        # Find tiles that fall within the polygon in the Solr index
-        if 'sort' in list(kwargs.keys()):
-            tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs)
-        else:
-            tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time,
-                                                                              end_time,
-                                                                              **kwargs)
-        return tiles
+        return NexusTileService._get_backend(ds).find_tiles_in_polygon(
+            bounding_polygon, ds, start_time, end_time, **kwargs
+        )
 
     @tile_data()
+    @catch_not_implemented
     def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
-        """
-        Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
-        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
-        :param ds: The dataset name to search
-        :param start_time: The start time to search for tiles
-        :param end_time: The end time to search for tiles
-        :return: A list of tiles
-        """
-        tiles = self._metadatastore.find_all_tiles_by_metadata(metadata, ds, start_time, end_time, **kwargs)
-
-        return tiles
+        return NexusTileService._get_backend(ds).find_tiles_by_metadata(
+            metadata, ds, start_time, end_time, **kwargs
+        )
 
     def get_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
         """
@@ -233,6 +478,7 @@ class NexusTileService(object):
         return tiles
 
     @tile_data()
+    @catch_not_implemented
     def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
         """
         The method will return tiles with the exact given bounds within the time range. It differs from
@@ -246,16 +492,16 @@ class NexusTileService(object):
         :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
         :return:
         """
-        tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds,
-                                                               start_time,
-                                                               end_time)
-        return tiles
+        return NexusTileService._get_backend(ds).find_tiles_by_exact_bounds(
+            bounds, ds, start_time, end_time, **kwargs
+        )
 
     @tile_data()
+    @catch_not_implemented
     def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
-                                                                   rows=5000,
-                                                                   **kwargs)
+        return NexusTileService._get_backend(dataset).find_all_boundary_tiles_at_time(
+            min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs
+        )
 
     def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1,
                                  **kwargs):
@@ -275,13 +521,15 @@ class NexusTileService(object):
 
         return tiles
 
+    @catch_not_implemented
     def get_min_max_time_by_granule(self, ds, granule_name):
-        start_time, end_time = self._metadatastore.find_min_max_date_from_granule(ds, granule_name)
-
-        return start_time, end_time
+        return NexusTileService._get_backend(ds).get_min_max_time_by_granule(
+            ds, granule_name
+        )
 
+    @catch_not_implemented
     def get_dataset_overall_stats(self, ds):
-        return self._metadatastore.get_data_series_stats(ds)
+        return NexusTileService._get_backend(ds).get_dataset_overall_stats(ds)
 
     def get_tiles_bounded_by_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
         tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs)
@@ -301,24 +549,19 @@ class NexusTileService(object):
 
         return tiles
 
+    @catch_not_implemented
     def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
-                                                                      **kwargs)
-
-        return tiles
+        return NexusTileService.get_stats_within_box_at_time(
+            min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs
+        )
 
-    def get_bounding_box(self, tile_ids):
+    def get_bounding_box(self, tile_ids, ds=None):
         """
         Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids.
         :param tile_ids: List of tile ids
         :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles
         """
-        tiles = self.find_tiles_by_id(tile_ids, fl=['tile_min_lat', 'tile_max_lat', 'tile_min_lon', 'tile_max_lon'],
-                                      fetch_data=False, rows=len(tile_ids))
-        polys = []
-        for tile in tiles:
-            polys.append(box(tile.bbox.min_lon, tile.bbox.min_lat, tile.bbox.max_lon, tile.bbox.max_lat))
-        return box(*MultiPolygon(polys).bounds)
+        return NexusTileService._get_backend(ds).get_bounding_box(tile_ids)
 
     def get_min_time(self, tile_ids, ds=None):
         """
@@ -327,8 +570,7 @@ class NexusTileService(object):
         :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
         :return: long time in seconds since epoch
         """
-        min_time = self._metadatastore.find_min_date_from_tiles(tile_ids, ds=ds)
-        return int((min_time - EPOCH).total_seconds())
+        return NexusTileService._get_backend(ds).get_min_time(tile_ids, ds)
 
     def get_max_time(self, tile_ids, ds=None):
         """
@@ -337,8 +579,7 @@ class NexusTileService(object):
         :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
         :return: long time in seconds since epoch
         """
-        max_time = self._metadatastore.find_max_date_from_tiles(tile_ids, ds=ds)
-        return int((max_time - EPOCH).total_seconds())
+        return int(NexusTileService._get_backend(ds).get_max_time(tile_ids))
 
     def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
         """
@@ -352,8 +593,19 @@ class NexusTileService(object):
         bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time)
         return [box(*b) for b in bounds]
 
-    def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles):
+    def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
+        """
+        Return number of tiles that match search criteria.
+        :param ds: The dataset name to search
+        :param bounding_polygon: The polygon to search for tiles
+        :param start_time: The start time to search for tiles
+        :param end_time: The end time to search for tiles
+        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+        :return: number of tiles that match search criteria
+        """
+        return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs)
 
+    def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles):
         for tile in tiles:
             tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat)
             tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon)
@@ -438,45 +690,12 @@ class NexusTileService(object):
 
         return tiles
 
-    def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
-        """
-        Return number of tiles that match search criteria.
-        :param ds: The dataset name to search
-        :param bounding_polygon: The polygon to search for tiles
-        :param start_time: The start time to search for tiles
-        :param end_time: The end time to search for tiles
-        :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
-        :return: number of tiles that match search criteria
-        """
-        return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs)
-
     def fetch_data_for_tiles(self, *tiles):
+        dataset = tiles[0].dataset
 
-        nexus_tile_ids = set([tile.tile_id for tile in tiles])
-        matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids)
-
-        tile_data_by_id = {str(a_tile_data.tile_id): a_tile_data for a_tile_data in matched_tile_data}
-
-        missing_data = nexus_tile_ids.difference(list(tile_data_by_id.keys()))
-        if len(missing_data) > 0:
-            raise Exception("Missing data for tile_id(s) %s." % missing_data)
-
-        for a_tile in tiles:
-            lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta()
-
-            a_tile.latitudes = lats
-            a_tile.longitudes = lons
-            a_tile.times = times
-            a_tile.data = data
-            a_tile.meta_data = meta
-            a_tile.is_multi = is_multi_var
-
-            del (tile_data_by_id[a_tile.tile_id])
-
-        return tiles
+        return NexusTileService._get_backend(dataset).fetch_data_for_tiles(*tiles)
 
     def _metadata_store_docs_to_tiles(self, *store_docs):
-
         tiles = []
         for store_doc in store_docs:
             tile = Tile()
@@ -573,7 +792,6 @@ class NexusTileService(object):
             except KeyError:
                 pass
 
-
             if 'tile_var_name_ss' in store_doc:
                 tile.variables = []
                 for var_name in store_doc['tile_var_name_ss']:
@@ -588,13 +806,6 @@ class NexusTileService(object):
 
         return tiles
 
-    def pingSolr(self):
-        status = self._metadatastore.ping()
-        if status and status["status"] == "OK":
-            return True
-        else:
-            return False
-
     @staticmethod
     def _get_config_files(filename):
         log = logging.getLogger(__name__)
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index f91f180..05b7894 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -21,4 +21,11 @@ urllib3==1.26.2
 requests
 nexusproto
 Shapely
-numpy==1.24.3
+s3fs==2022.5.0
+fsspec==2022.5.0
+botocore==1.24.21
+aiohttp==3.8.1
+xarray~=2022.3.0
+zarr>=2.11.3
+pandas<2.1.0rc0  # Temporary restriction because 2.1.0rc0 fails to build
+
diff --git a/data-access/setup.py b/data-access/setup.py
index ab0248f..e539e1e 100644
--- a/data-access/setup.py
+++ b/data-access/setup.py
@@ -12,11 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import setuptools
 from setuptools import setup
 
-with open('../VERSION.txt', 'r') as f:
-    __version__ = f.read()
+try:
+    with open('../VERSION.txt', 'r') as f:
+        __version__ = f.read()
+except:
+    __version__ = None
 
 
 with open('requirements.txt') as f:
@@ -32,8 +35,13 @@ setup(
     description="NEXUS API.",
     long_description=open('README.md').read(),
 
-    packages=['nexustiles', 'nexustiles.model', 'nexustiles.dao'],
-    package_data={'nexustiles': ['config/datastores.ini.default', 'config/datastores.ini']},
+    packages=setuptools.find_packages(),  # ['nexustiles', 'nexustiles.model', 'nexustiles.dao'],
+    package_data={
+        'nexustiles':
+            ['config/datasets.ini.default', 'config/datasets.ini'],
+        'nexustiles.backends.nexusproto':
+            ['config/datastores.ini.default', 'config/datastores.ini']
+    },
     platforms='any',
     python_requires='~=3.8',
     install_requires=pip_requirements,