You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2024/03/07 17:59:21 UTC
(incubator-sdap-nexus) branch develop updated: SDAP-472 - data-access overhaul to support multiple simultaneous data backends (#294)
This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/develop by this push:
new 90e0b00 SDAP-472 - data-access overhaul to support multiple simultaneous data backends (#294)
90e0b00 is described below
commit 90e0b004ddaec3a508688f92fd506b757b80f41c
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Thu Mar 7 09:59:16 2024 -0800
SDAP-472 - data-access overhaul to support multiple simultaneous data backends (#294)
* Separated NTS backends
* n/a
* More nts backend stuff
* Working(?) np backend
* Working(?) np backend
* gitignore ini
* ASF headers
* First functioning test of 2 simultaneous backends
* Removed accidentally committed ini files
* Working zarr backend ds list
+ datasets are no longer case sensitive
+ handling for failed zarr ds opens (bad path, bad creds, &c)
* Capture and handle NTS requests routed to backend that doesn't (yet) support them
* analysis setup fails to find VERSION.txt when building locally
* Implemented more NTS functions in zarr backend
* Added misc backend time metrics record field in NCSH
* fixes
* Dynamic dataset management
* Dynamic dataset management
* Dataset management
* Timeseriesspark support
* Update backend dict on dataset mgmt query
* Fixes and improvements
* Adapted matchup to work with zarr backends
* Zarr support
- Distinct slices of time is now default
- No longer assuming+shaping as multivar tiles unless needed
* DDAS adjustments
* find_tile_by_polygon_and_most_recent_day_of_year impl
* Don't sel by time if neither max nor min time are given
* Fix not calling partial when needed
* Pinned s3fs and fsspec versions
* Fixed some dependencies to ensure image builds properly + s3fs works
* Config override for backends
* Deps update
* Add metadata from Zarr collection to /list
* Zarr: Probe lat order and flip if necessary
* Strip quotes from variable names
CM can sometimes publish with extra quotes resulting in KeyErrors
* removed resultSizeLimit param from matchup
* Add # of primaries/avergae secondaries to job output
* rename to executionId
* update changelog
* add totalSecondaryMatched field to /job output
* num unique secondaries addition
* updated docs to use correct sea_water_temperature param name
* bugfix
* fix division by zero bug
* add params to dataset management handler classes
* add page number to default filename for matchup output
* pagination improvements
* removed debugging line
* changelog
* Update helm cassandra dependency (#289)
* Update helm cassandra dependency
* Bump default cassandra PV to 4
* Bump default cassandra PV to 4 in tools
* Changelog
* Fixed small documentation issue
---------
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
* stac catalog
* Updated openapi spec
* move stac endpoints to matchup tag in openapi spec
* SDAP-507 - Changes to remove geos sub-dependency
* SDAP-507 - Changelog
* SDAP-507 - Changes to remove geos sub-dependency
* SDAP-507 - Changelog
* delete instead of comment out
* Revert "Update helm cassandra dependency (#289)"
This reverts commit 1e8cc4e9d31d295e172c0db4bba61a5776642bea.
* deleted disabled endpoint files
* fix bug where still-running jobs failed /job endpoint due to missing metadata
* Update .asf.yaml (#293)
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
* Moved changelog entries
* SDAP-472 changelog entries
---------
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
Co-authored-by: skorper <st...@gmail.com>
---
.gitignore | 3 +-
CHANGELOG.md | 4 +
analysis/conda-requirements.txt | 5 +-
analysis/setup.py | 7 +-
.../algorithms/DailyDifferenceAverage.py | 3 +-
.../algorithms/StandardDeviationSearch.py | 2 +-
.../DailyDifferenceAverageSpark.py | 6 +-
.../webservice/algorithms_spark/HofMoellerSpark.py | 8 +-
analysis/webservice/algorithms_spark/Matchup.py | 16 +-
.../algorithms_spark/NexusCalcSparkHandler.py | 3 +
.../webservice/algorithms_spark/TimeSeriesSpark.py | 5 +-
analysis/webservice/config/web.ini | 2 +-
analysis/webservice/management/Datasets.py | 242 +++++++++
.../webservice/management}/__init__.py | 2 +
.../nexus_tornado/app_builders/NexusAppBuilder.py | 2 +-
.../request/handlers/NexusRequestHandler.py | 29 ++
analysis/webservice/webmodel/NexusRequestObject.py | 6 +
data-access/nexustiles/AbstractTileService.py | 202 ++++++++
.../nexustiles/{dao => backends}/__init__.py | 1 +
.../{dao => backends/nexusproto}/__init__.py | 1 +
.../nexusproto/backend.py} | 70 +--
.../nexusproto}/config/datastores.ini.default | 0
.../nexusproto}/dao/CassandraProxy.py | 0
.../{ => backends/nexusproto}/dao/DynamoProxy.py | 0
.../nexusproto}/dao/ElasticsearchProxy.py | 0
.../{ => backends/nexusproto}/dao/S3Proxy.py | 0
.../{ => backends/nexusproto}/dao/SolrProxy.py | 3 +-
.../{ => backends/nexusproto}/dao/__init__.py | 0
.../nexustiles/{dao => backends/zarr}/__init__.py | 1 +
data-access/nexustiles/backends/zarr/backend.py | 532 ++++++++++++++++++++
.../__init__.py => config/datasets.ini.default} | 4 +
.../nexustiles/{dao/__init__.py => exception.py} | 4 +
data-access/nexustiles/nexustiles.py | 557 ++++++++++++++-------
data-access/requirements.txt | 9 +-
data-access/setup.py | 18 +-
35 files changed, 1484 insertions(+), 263 deletions(-)
diff --git a/.gitignore b/.gitignore
index 12ab2d6..23f8435 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,5 +4,6 @@
*.idea
*.DS_Store
analysis/webservice/algorithms/doms/domsconfig.ini
-data-access/nexustiles/config/datastores.ini
+data-access/nexustiles/backends/nexusproto/config/datastores.ini
+data-access/nexustiles/config/datasets.ini
venv/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e0f873d..ff9e717 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,12 +10,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added STAC Catalog endpoint for matchup outputs
- SDAP-508: Added spatial extents to the satellite dataset entries in `/list` and `/cdmslist`
- SDAP-505: Added support for DOMS insitu api
+- SDAP-472:
+ - Support for Zarr backend (gridded data only)
+ - Dataset management endpoints for Zarr datasets
### Changed
- SDAP-493:
- Updated /job endpoint to use `executionId` terminology for consistency with existing `/cdmsresults` endpoint
- Updated /job endpoint with details about number of primary and secondary tiles.
- SDAP-500: Improvements to SDAP Asynchronous Jobs
- SDAP-499: Added page number to default filename for matchup output
+- SDAP-472: Overhauled `data-access` to support multiple backends for simultaneous support of multiple ARD formats
### Deprecated
### Removed
- SDAP-493:
diff --git a/analysis/conda-requirements.txt b/analysis/conda-requirements.txt
index e27bdea..22dff06 100644
--- a/analysis/conda-requirements.txt
+++ b/analysis/conda-requirements.txt
@@ -22,7 +22,8 @@ pytz==2021.1
utm==0.6.0
shapely==1.7.1
backports.functools_lru_cache==1.6.1
-boto3==1.16.63
+boto3>=1.16.63
+botocore==1.24.21
pillow==8.1.0
mpld3=0.5.1
tornado==6.1
@@ -33,4 +34,4 @@ gdal==3.2.1
mock==4.0.3
importlib_metadata==4.11.4
#singledispatch==3.4.0.3
-
+schema
diff --git a/analysis/setup.py b/analysis/setup.py
index 8fbc617..2e09815 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -17,8 +17,11 @@
import setuptools
from subprocess import check_call, CalledProcessError
-with open('../VERSION.txt', 'r') as f:
- __version__ = f.read()
+try:
+ with open('../VERSION.txt', 'r') as f:
+ __version__ = f.read()
+except:
+ __version__ = None
try:
diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py
index 05274fc..c6c8495 100644
--- a/analysis/webservice/algorithms/DailyDifferenceAverage.py
+++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py
@@ -21,7 +21,8 @@ from multiprocessing.dummy import Pool, Manager
import numpy as np
import pytz
-from nexustiles.nexustiles import NexusTileService, NexusTileServiceException
+from nexustiles.nexustiles import NexusTileService
+from nexustiles.exception import NexusTileServiceException
from shapely.geometry import box
from webservice.NexusHandler import nexus_handler
diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py
index ae0566f..26451cb 100644
--- a/analysis/webservice/algorithms/StandardDeviationSearch.py
+++ b/analysis/webservice/algorithms/StandardDeviationSearch.py
@@ -19,7 +19,7 @@ import logging
from datetime import datetime
from functools import partial
-from nexustiles.nexustiles import NexusTileServiceException
+from nexustiles.exception import NexusTileServiceException
from pytz import timezone
from webservice.NexusHandler import nexus_handler
diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
index b424578..12f7dee 100644
--- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -324,7 +324,7 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat
for tile_id in tile_ids:
# Get the dataset tile
try:
- dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id)
+ dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id, dataset.value)
except NoDatasetTile:
# This should only happen if all measurements in a tile become masked after applying the bounding polygon
continue
@@ -348,12 +348,12 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat
return chain(*diff_generators)
-def get_dataset_tile(tile_service, search_bounding_shape, tile_id):
+def get_dataset_tile(tile_service, search_bounding_shape, tile_id, dataset):
the_time = datetime.now()
try:
# Load the dataset tile
- dataset_tile = tile_service.find_tile_by_id(tile_id)[0]
+ dataset_tile = tile_service.find_tile_by_id(tile_id, ds=dataset)[0]
# Mask it to the search domain
dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0]
except IndexError:
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index d50006a..e876a11 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -44,12 +44,12 @@ class HofMoellerCalculator(object):
def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark):
(latlon, tile_id, index,
- min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+ min_lat, max_lat, min_lon, max_lon, dataset) = tile_in_spark
tile_service = tile_service_factory()
try:
# Load the dataset tile
- tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0]
+ tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback, ds=dataset)[0]
calculation_start = datetime.now()
# Mask it to the search domain
tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
@@ -352,7 +352,7 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
min_lon, min_lat, max_lon, max_lat = bbox.bounds
- nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+ nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
ds, start_time, end_time,
metrics_callback=metrics_record.record_metrics,
@@ -408,7 +408,7 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
min_lon, min_lat, max_lon, max_lat = bbox.bounds
- nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+ nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
ds, start_time, end_time,
metrics_callback=metrics_record.record_metrics,
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index ee43624..e738a69 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -841,9 +841,9 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
tile_service = tile_service_factory()
# Determine the spatial temporal extents of this partition of tiles
- tiles_bbox = tile_service.get_bounding_box(tile_ids)
- tiles_min_time = tile_service.get_min_time(tile_ids)
- tiles_max_time = tile_service.get_max_time(tile_ids)
+ tiles_bbox = tile_service.get_bounding_box(tile_ids, ds=primary_b.value)
+ tiles_min_time = tile_service.get_min_time(tile_ids, ds=primary_b.value)
+ tiles_max_time = tile_service.get_max_time(tile_ids, ds=primary_b.value)
# Increase spatial extents by the radius tolerance
matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
@@ -922,7 +922,7 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
edge_results = []
for tile in matchup_tiles:
# Retrieve tile data and convert to lat/lon projection
- tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True)
+ tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True, ds=secondary_b.value)
tile = tiles[0]
valid_indices = tile.get_indices()
@@ -948,14 +948,14 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
# The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
- parameter_b.value, rt_b.value, aeqd_proj) for tile_id
- in tile_ids]
+ parameter_b.value, rt_b.value, aeqd_proj, primary_b.value)
+ for tile_id in tile_ids]
return chain(*match_generators)
def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
- search_parameter, radius_tolerance, aeqd_proj):
+ search_parameter, radius_tolerance, aeqd_proj, primary_ds):
from nexustiles.model.nexusmodel import NexusPoint
from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains
@@ -963,7 +963,7 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
try:
the_time = datetime.now()
tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
- tile_service.find_tile_by_id(tile_id))[0]
+ tile_service.find_tile_by_id(tile_id, ds=primary_ds))[0]
print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id))
except IndexError:
# This should only happen if all measurements in a tile become masked after applying the bounding polygon
diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
index 4499773..e033467 100644
--- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
@@ -362,6 +362,9 @@ class NexusCalcSparkHandler(NexusCalcHandler):
SparkAccumulatorMetricsField(key='solr',
description='Cumulative time to fetch data from Solr',
accumulator=self._sc.accumulator(0)),
+ SparkAccumulatorMetricsField(key='backend',
+ description='Cumulative time to fetch data from external backend(s)',
+ accumulator=self._sc.accumulator(0)),
SparkAccumulatorMetricsField(key='calculation',
description='Cumulative time to do calculations',
accumulator=self._sc.accumulator(0)),
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 90ae14d..804d3ec 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -483,8 +483,9 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates,
timestamps[0],
timestamps[-1],
rows=5000,
- metrics_callback=metrics_callback)
-
+ metrics_callback=metrics_callback,
+ distinct=True)
+
calculation_start = datetime.now()
tile_dict = {}
diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini
index 8584975..a9e3dda 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -29,4 +29,4 @@ static_enabled=true
static_dir=static
[modules]
-module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms
\ No newline at end of file
+module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms,webservice.management
\ No newline at end of file
diff --git a/analysis/webservice/management/Datasets.py b/analysis/webservice/management/Datasets.py
new file mode 100644
index 0000000..40b267f
--- /dev/null
+++ b/analysis/webservice/management/Datasets.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from yaml import load
+import json
+from webservice.NexusHandler import nexus_handler
+from nexustiles.nexustiles import NexusTileService
+from webservice.webmodel import NexusRequestObject, NexusProcessingException
+
+from schema import Schema, Or, SchemaError
+from schema import Optional as Opt
+
+from urllib.parse import urlparse
+try:
+ from yaml import CLoader as Loader
+except ImportError:
+ from yaml import Loader
+
+
+CONFIG_SCHEMA = Schema({
+ Or('variable', 'variables'): Or(str, [str]),
+ 'coords': {
+ 'latitude': str,
+ 'longitude': str,
+ 'time': str,
+ Opt('depth'): str
+ },
+ Opt('aws'): {
+ Opt('accessKeyID'): str,
+ Opt('secretAccessKey'): str,
+ 'public': bool,
+ Opt('region'): str
+ }
+})
+
+
+class DatasetManagement:
+ @classmethod
+ def validate(cls):
+ pass
+
+ @staticmethod
+ def parse_config(request: NexusRequestObject):
+ content_type = request.get_headers()['Content-Type']
+
+ if content_type in ['application/json', 'application/x-json']:
+ config_dict = json.loads(request.get_request_body())
+ elif content_type == 'application/yaml':
+ config_dict = load(request.get_request_body(), Loader=Loader)
+ else:
+ raise NexusProcessingException(reason='Invalid Content-Type header', code=400)
+
+ try:
+ CONFIG_SCHEMA.validate(config_dict)
+
+ if 'aws' in config_dict:
+ if not config_dict['aws']['public']:
+ if 'accessKeyID' not in config_dict['aws'] or 'secretAccessKey' not in config_dict['aws']:
+ raise NexusProcessingException(
+ reason='Must provide AWS creds for non-public bucket',
+ code=400
+ )
+ except SchemaError as e:
+ raise NexusProcessingException(
+ reason=str(e),
+ code=400
+ )
+
+ return config_dict
+
+
+class Response:
+ def __init__(self, response):
+ self.response = response if response is not None else {}
+
+ def toJson(self):
+ return json.dumps(self.response)
+
+
+@nexus_handler
+class DatasetAdd(DatasetManagement):
+ name = 'Add dataset'
+ path = '/datasets/add'
+ description = "Add new Zarr dataset to running SDAP instance"
+ params = {
+ "name": {
+ "name": "Dataset name",
+ "type": "string",
+ "description": "Name of new dataset to add"
+ },
+ "path": {
+ "name": "Path or URL",
+ "type": "string",
+ "description": "Path/URL of Zarr group"
+ },
+ "body": {
+ "name": "Request body",
+ "type": "application/json OR application/yaml",
+ "description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))"
+ }
+ }
+
+ def __init__(self, **args):
+ pass
+
+ def calc(self, request: NexusRequestObject, **args):
+ try:
+ config = DatasetManagement.parse_config(request)
+ except Exception as e:
+ raise NexusProcessingException(
+ reason=repr(e),
+ code=400
+ )
+
+ name = request.get_argument('name')
+
+ if name is None:
+ raise NexusProcessingException(
+ reason='Name argument must be provided',
+ code=400
+ )
+
+ path = request.get_argument('path')
+
+ if path is None:
+ raise NexusProcessingException(
+ reason='Path argument must be provided',
+ code=400
+ )
+
+ try:
+ if urlparse(path).scheme not in ['file','','s3']:
+ raise NexusProcessingException(
+ reason='Dataset URL must be for a local file or S3 URL',
+ code=400
+ )
+ except ValueError:
+ raise NexusProcessingException(
+ reason='Could not parse path URL', code=400
+ )
+
+ try:
+ NexusTileService.user_ds_add(name, path, config)
+ except Exception as e:
+ raise NexusProcessingException(
+ reason=repr(e),
+ code=500
+ )
+
+
+@nexus_handler
+class DatasetUpdate(DatasetManagement):
+ name = 'Update dynamically added dataset'
+ path = '/datasets/update'
+ description = "Update Zarr dataset in running SDAP instance"
+ params = {
+ "name": {
+ "name": "Dataset name",
+ "type": "string",
+ "description": "Name of dataset to update"
+ },
+ "body": {
+ "name": "Request body",
+ "type": "application/json OR application/yaml",
+ "description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))"
+ }
+ }
+
+ def __init__(self, **args):
+ pass
+
+ def calc(self, request: NexusRequestObject, **args):
+ try:
+ config = DatasetManagement.parse_config(request)
+ except Exception as e:
+ raise NexusProcessingException(
+ reason=repr(e),
+ code=400
+ )
+
+ name = request.get_argument('name')
+
+ if name is None:
+ raise NexusProcessingException(
+ reason='Name argument must be provided',
+ code=400
+ )
+
+ try:
+ return Response(NexusTileService.user_ds_update(name, config))
+ except Exception as e:
+ raise NexusProcessingException(
+ reason=repr(e),
+ code=500
+ )
+
+
+@nexus_handler
+class DatasetDelete(DatasetManagement):
+ name = 'Remove dataset'
+ path = '/datasets/remove'
+ description = "Remove Zarr dataset from running SDAP instance"
+ params = {
+ "name": {
+ "name": "Dataset name",
+ "type": "string",
+ "description": "Name of dataset to remove"
+ }
+ }
+
+ def __init__(self, **args):
+ pass
+
+ def calc(self, request: NexusRequestObject, **args):
+ name = request.get_argument('name')
+
+ if name is None:
+ raise NexusProcessingException(
+ reason='Name argument must be provided',
+ code=400
+ )
+
+ try:
+ return Response(NexusTileService.user_ds_delete(name))
+ except Exception as e:
+ raise NexusProcessingException(
+ reason=repr(e),
+ code=500
+ )
+
diff --git a/data-access/nexustiles/dao/__init__.py b/analysis/webservice/management/__init__.py
similarity index 93%
copy from data-access/nexustiles/dao/__init__.py
copy to analysis/webservice/management/__init__.py
index 6acb5d1..7c9f5ef 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/analysis/webservice/management/__init__.py
@@ -12,3 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from webservice.management.Datasets import DatasetAdd
\ No newline at end of file
diff --git a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
index afe7d69..0179858 100644
--- a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
@@ -53,7 +53,7 @@ class NexusAppBuilder:
NexusHandler.executeInitializers(algorithm_config)
self.log.info("Initializing request ThreadPool to %s" % max_request_threads)
- tile_service_factory = partial(NexusTileService, False, False, algorithm_config)
+ tile_service_factory = partial(NexusTileService, algorithm_config)
handler_args_builder = HandlerArgsBuilder(
max_request_threads,
tile_service_factory,
diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 95bddf4..7a55967 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -75,6 +75,35 @@ class NexusRequestHandler(tornado.web.RequestHandler):
except Exception as e:
self.async_onerror_callback(str(e), 500)
+ @tornado.gen.coroutine
+ def post(self):
+ self.logger.info("Received %s" % self._request_summary())
+
+ request = NexusRequestObject(self)
+
+ # create NexusCalcHandler which will process the request
+ instance = self.__clazz(**self._clazz_init_args)
+
+ try:
+ # process the request asynchronously on a different thread,
+ # the current tornado handler is still available to get other user requests
+ results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
+
+ if results:
+ try:
+ self.set_status(results.status_code)
+ except AttributeError:
+ pass
+
+ renderer = NexusRendererFactory.get_renderer(request)
+ renderer.render(self, results)
+
+ except NexusProcessingException as e:
+ self.async_onerror_callback(e.reason, e.code)
+
+ except Exception as e:
+ self.async_onerror_callback(str(e), 500)
+
def async_onerror_callback(self, reason, code=500):
self.logger.error("Error processing request", exc_info=True)
diff --git a/analysis/webservice/webmodel/NexusRequestObject.py b/analysis/webservice/webmodel/NexusRequestObject.py
index bbd2828..1896236 100644
--- a/analysis/webservice/webmodel/NexusRequestObject.py
+++ b/analysis/webservice/webmodel/NexusRequestObject.py
@@ -35,6 +35,12 @@ class NexusRequestObject(StatsComputeOptions):
self.requestHandler = reqHandler
StatsComputeOptions.__init__(self)
+ def get_headers(self):
+ return self.requestHandler.request.headers
+
+ def get_request_body(self):
+ return self.requestHandler.request.body
+
def get_argument(self, name, default=None):
return self.requestHandler.get_argument(name, default=default)
diff --git a/data-access/nexustiles/AbstractTileService.py b/data-access/nexustiles/AbstractTileService.py
new file mode 100644
index 0000000..2046778
--- /dev/null
+++ b/data-access/nexustiles/AbstractTileService.py
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from functools import reduce
+
+import numpy as np
+import numpy.ma as ma
+
+
+class AbstractTileService(ABC):
+ # @staticmethod
+ # @abstractmethod
+ # def open_dataset(dataset_s, **kwargs):
+ # pass
+
+ # @abstractmethod
+ # def try_connect(self) -> bool:
+ # raise NotImplementedError()
+
+ def __init__(self, dataset_name):
+ self._name = dataset_name
+
+ @abstractmethod
+ def get_dataseries_list(self, simple=False):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tile_by_id(self, tile_id, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+ metrics_callback=None, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
+ """
+ Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
+ polygon and the closest day of year.
+
+ For example:
+ given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32
+ search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc)
+
+ Valid matches:
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30
+
+ Invalid matches:
+ minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists
+
+ :param bounding_polygon: The exact bounding polygon of tiles to search for
+ :param ds: The dataset name being searched
+ :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned
+ :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
+ # Find tiles that fall in the given box in the Solr index
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
+ # Find tiles that fall within the polygon in the Solr index
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
+ """
+ Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
+ :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+ :param ds: The dataset name to search
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :return: A list of tiles
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
+ """
+ The method will return tiles with the exact given bounds within the time range. It differs from
+ find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to
+ doing a polygon intersection with the given bounds.
+
+ :param bounds: (minx, miny, maxx, maxy) bounds to search for
+ :param ds: Dataset name to search
+ :param start_time: Start time to search (seconds since epoch)
+ :param end_time: End time to search (seconds since epoch)
+ :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
+ :return:
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_min_max_time_by_granule(self, ds, granule_name):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_dataset_overall_stats(self, ds):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_bounding_box(self, tile_ids):
+ """
+ Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids.
+ :param tile_ids: List of tile ids
+ :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_min_time(self, tile_ids, ds=None):
+ """
+ Get the minimum tile date from the list of tile ids
+ :param tile_ids: List of tile ids
+ :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+ :return: long time in seconds since epoch
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_max_time(self, tile_ids, ds=None):
+ """
+ Get the maximum tile date from the list of tile ids
+ :param tile_ids: List of tile ids
+ :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+ :return: long time in seconds since epoch
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
+ """
+ Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range.
+ :param bounding_polygon: The bounding polygon of tiles to search for
+ :param ds: The dataset name to search
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
+ """
+ Return number of tiles that match search criteria.
+ :param ds: The dataset name to search
+ :param bounding_polygon: The polygon to search for tiles
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+ :return: number of tiles that match search criteria
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def fetch_data_for_tiles(self, *tiles):
+ raise NotImplementedError()
+
+ @abstractmethod
+ def _metadata_store_docs_to_tiles(self, *store_docs):
+ raise NotImplementedError()
+
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/__init__.py
similarity index 99%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/__init__.py
index 6acb5d1..8afd240 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/backends/__init__.py
@@ -12,3 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/nexusproto/__init__.py
similarity index 99%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/nexusproto/__init__.py
index 6acb5d1..8afd240 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/backends/nexusproto/__init__.py
@@ -12,3 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/backends/nexusproto/backend.py
similarity index 91%
copy from data-access/nexustiles/nexustiles.py
copy to data-access/nexustiles/backends/nexusproto/backend.py
index a3aa61e..690b109 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/backends/nexusproto/backend.py
@@ -18,7 +18,7 @@ import logging
import sys
import json
from datetime import datetime
-from functools import wraps, reduce
+from functools import reduce
import numpy as np
import numpy.ma as ma
@@ -32,59 +32,23 @@ from .dao import S3Proxy
from .dao import SolrProxy
from .dao import ElasticsearchProxy
-from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
+from nexustiles.model.nexusmodel import Tile, BBox, TileStats, TileVariable
+from nexustiles.exception import NexusTileServiceException
+from nexustiles.AbstractTileService import AbstractTileService
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
-logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
-logger = logging.getLogger("testing")
+logger = logging.getLogger(__name__)
-def tile_data(default_fetch=True):
- def tile_data_decorator(func):
- @wraps(func)
- def fetch_data_for_func(*args, **kwargs):
- metadatastore_start = datetime.now()
- metadatastore_docs = func(*args, **kwargs)
- metadatastore_duration = (datetime.now() - metadatastore_start).total_seconds()
- tiles = args[0]._metadata_store_docs_to_tiles(*metadatastore_docs)
-
- cassandra_duration = 0
- if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch):
- if len(tiles) > 0:
- cassandra_start = datetime.now()
- args[0].fetch_data_for_tiles(*tiles)
- cassandra_duration += (datetime.now() - cassandra_start).total_seconds()
-
- if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None:
- try:
- kwargs['metrics_callback'](cassandra=cassandra_duration,
- metadatastore=metadatastore_duration,
- num_tiles=len(tiles))
- except Exception as e:
- logger.error("Metrics callback '{}'raised an exception. Will continue anyway. " +
- "The exception was: {}".format(kwargs['metrics_callback'], e))
- return tiles
-
- return fetch_data_for_func
-
- return tile_data_decorator
-
-
-class NexusTileServiceException(Exception):
- pass
-
-
-class NexusTileService(object):
+class NexusprotoTileService(AbstractTileService):
def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
+ AbstractTileService.__init__(self, None)
self._datastore = None
self._metadatastore = None
self._config = configparser.RawConfigParser()
- self._config.read(NexusTileService._get_config_files('config/datastores.ini'))
+ self._config.read(NexusprotoTileService._get_config_files('config/datastores.ini'))
if config:
self.override_config(config)
@@ -120,11 +84,9 @@ class NexusTileService(object):
else:
return self._metadatastore.get_data_series_list()
- @tile_data()
def find_tile_by_id(self, tile_id, **kwargs):
return self._metadatastore.find_tile_by_id(tile_id)
- @tile_data()
def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
@@ -139,7 +101,6 @@ class NexusTileService(object):
metrics_callback(solr=duration)
return result
- @tile_data()
def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
"""
Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
@@ -171,18 +132,15 @@ class NexusTileService(object):
return tile
- @tile_data()
def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
rows=5000,
**kwargs)
- @tile_data()
def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
**kwargs)
- @tile_data()
def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
# Find tiles that fall in the given box in the Solr index
if type(start_time) is datetime:
@@ -192,7 +150,6 @@ class NexusTileService(object):
return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
end_time, **kwargs)
- @tile_data()
def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
# Find tiles that fall within the polygon in the Solr index
if 'sort' in list(kwargs.keys()):
@@ -203,7 +160,6 @@ class NexusTileService(object):
**kwargs)
return tiles
- @tile_data()
def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
"""
Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
@@ -232,7 +188,6 @@ class NexusTileService(object):
return tiles
- @tile_data()
def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
"""
The method will return tiles with the exact given bounds within the time range. It differs from
@@ -251,7 +206,6 @@ class NexusTileService(object):
end_time)
return tiles
- @tile_data()
def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
rows=5000,
@@ -315,6 +269,9 @@ class NexusTileService(object):
"""
tiles = self.find_tiles_by_id(tile_ids, fl=['tile_min_lat', 'tile_max_lat', 'tile_min_lon', 'tile_max_lon'],
fetch_data=False, rows=len(tile_ids))
+
+ tiles = self._metadata_store_docs_to_tiles(*tiles)
+
polys = []
for tile in tiles:
polys.append(box(tile.bbox.min_lon, tile.bbox.min_lat, tile.bbox.max_lon, tile.bbox.max_lat))
@@ -556,8 +513,8 @@ class NexusTileService(object):
var_names = [store_doc['tile_var_name_s']]
standard_name = store_doc.get(
- 'tile_standard_name_s',
- json.dumps([None] * len(var_names))
+ 'tile_standard_name_s',
+ json.dumps([None] * len(var_names))
)
if '[' in standard_name:
standard_names = json.loads(standard_name)
@@ -573,7 +530,6 @@ class NexusTileService(object):
except KeyError:
pass
-
if 'tile_var_name_ss' in store_doc:
tile.variables = []
for var_name in store_doc['tile_var_name_ss']:
diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/backends/nexusproto/config/datastores.ini.default
similarity index 100%
rename from data-access/nexustiles/config/datastores.ini.default
rename to data-access/nexustiles/backends/nexusproto/config/datastores.ini.default
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
similarity index 100%
rename from data-access/nexustiles/dao/CassandraProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
diff --git a/data-access/nexustiles/dao/DynamoProxy.py b/data-access/nexustiles/backends/nexusproto/dao/DynamoProxy.py
similarity index 100%
rename from data-access/nexustiles/dao/DynamoProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/DynamoProxy.py
diff --git a/data-access/nexustiles/dao/ElasticsearchProxy.py b/data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py
similarity index 100%
rename from data-access/nexustiles/dao/ElasticsearchProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py
diff --git a/data-access/nexustiles/dao/S3Proxy.py b/data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py
similarity index 100%
rename from data-access/nexustiles/dao/S3Proxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/S3Proxy.py
diff --git a/data-access/nexustiles/dao/SolrProxy.py b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
similarity index 99%
rename from data-access/nexustiles/dao/SolrProxy.py
rename to data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
index a072c6b..e466ad4 100644
--- a/data-access/nexustiles/dao/SolrProxy.py
+++ b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
@@ -217,7 +217,8 @@ class SolrProxy(object):
l.append({
"shortName": g,
"title": g,
- "tileCount": v
+ "tileCount": v,
+ "type": 'nexusproto'
})
l = sorted(l, key=lambda entry: entry["title"])
return l
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/nexusproto/dao/__init__.py
similarity index 100%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/nexusproto/dao/__init__.py
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/backends/zarr/__init__.py
similarity index 99%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/backends/zarr/__init__.py
index 6acb5d1..8afd240 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/backends/zarr/__init__.py
@@ -12,3 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
diff --git a/data-access/nexustiles/backends/zarr/backend.py b/data-access/nexustiles/backends/zarr/backend.py
new file mode 100644
index 0000000..e1d0a0c
--- /dev/null
+++ b/data-access/nexustiles/backends/zarr/backend.py
@@ -0,0 +1,532 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import sys
+from datetime import datetime
+from urllib.parse import urlparse
+
+import numpy as np
+import numpy.ma as ma
+import s3fs
+import xarray as xr
+from nexustiles.AbstractTileService import AbstractTileService
+from nexustiles.exception import NexusTileServiceException
+from nexustiles.model.nexusmodel import Tile, BBox, TileVariable
+from pytz import timezone
+from shapely.geometry import MultiPolygon, box
+from yarl import URL
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger(__name__)
+
+
+class ZarrBackend(AbstractTileService):
+ def __init__(self, dataset_name, path, config=None):
+ AbstractTileService.__init__(self, dataset_name)
+ self.__config = config if config is not None else {}
+
+ logger.info(f'Opening zarr backend at {path} for dataset {self._name}')
+
+ url = urlparse(path)
+
+ self.__url = path
+
+ self.__store_type = url.scheme
+ self.__host = url.netloc
+ self.__path = url.path
+
+ if 'variable' in config:
+ data_vars = config['variable']
+ elif 'variables' in config:
+ data_vars = config['variables']
+ else:
+ raise KeyError('Data variables not provided in config')
+
+ if isinstance(data_vars, str):
+ self.__variables = [data_vars]
+ elif isinstance(data_vars, list):
+ self.__variables = data_vars
+ else:
+ raise TypeError(f'Improper type for variables config: {type(data_vars)}')
+
+ self.__variables = [v.strip('\"\'') for v in self.__variables]
+
+ self.__longitude = config['coords']['longitude']
+ self.__latitude = config['coords']['latitude']
+ self.__time = config['coords']['time']
+
+ self.__depth = config['coords'].get('depth')
+
+ if self.__store_type in ['', 'file']:
+ store = self.__path
+ elif self.__store_type == 's3':
+ try:
+ aws_cfg = self.__config['aws']
+
+ if aws_cfg['public']:
+ # region = aws_cfg.get('region', 'us-west-2')
+ # store = f'https://{self.__host}.s3.{region}.amazonaws.com{self.__path}'
+ s3 = s3fs.S3FileSystem(True)
+ store = s3fs.S3Map(root=path, s3=s3, check=False)
+ else:
+ s3 = s3fs.S3FileSystem(False, key=aws_cfg['accessKeyID'], secret=aws_cfg['secretAccessKey'])
+ store = s3fs.S3Map(root=path, s3=s3, check=False)
+ except Exception as e:
+ logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}')
+ raise NexusTileServiceException(f'Cannot open S3 dataset ({e})')
+ else:
+ raise ValueError(self.__store_type)
+
+ try:
+ self.__ds: xr.Dataset = xr.open_zarr(store, consolidated=True)
+ except Exception as e:
+ logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}')
+ raise NexusTileServiceException(f'Cannot open dataset ({e})')
+
+ lats = self.__ds[self.__latitude].to_numpy()
+ delta = lats[1] - lats[0]
+
+ if delta < 0:
+ logger.warning(f'Latitude coordinate for {self._name} is in descending order. Flipping it to ascending')
+ self.__ds = self.__ds.isel({self.__latitude: slice(None, None, -1)})
+
+ def get_dataseries_list(self, simple=False):
+ ds = {
+ "shortName": self._name,
+ "title": self._name,
+ "type": "zarr"
+ }
+
+ if not simple:
+ min_date = self.get_min_time([])
+ max_date = self.get_max_time([])
+ ds['start'] = min_date
+ ds['end'] = max_date
+ ds['iso_start'] = datetime.utcfromtimestamp(min_date).strftime(ISO_8601)
+ ds['iso_end'] = datetime.utcfromtimestamp(max_date).strftime(ISO_8601)
+
+ ds['metadata'] = dict(self.__ds.attrs)
+
+ return [ds]
+
+ def find_tile_by_id(self, tile_id, **kwargs):
+ return [tile_id]
+
+ def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
+ return tile_ids
+
+ def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+ metrics_callback=None, **kwargs):
+ start = datetime.now()
+
+ if not isinstance(start_time, datetime):
+ start_time = datetime.utcfromtimestamp(start_time)
+
+ if not isinstance(end_time, datetime):
+ end_time = datetime.utcfromtimestamp(end_time)
+
+ sel = {
+ self.__latitude: slice(min_lat, max_lat),
+ self.__longitude: slice(min_lon, max_lon),
+ self.__time: slice(start_time, end_time)
+ }
+
+ times = self.__ds.sel(sel)[self.__time].to_numpy()
+
+ if np.issubdtype(times.dtype, np.datetime64):
+ times = ((times - np.datetime64(EPOCH)) / 1e9).astype(int)
+
+ times = sorted(times.tolist())
+
+ if metrics_callback:
+ metrics_callback(backend=(datetime.now() - start).total_seconds())
+
+ return times
+
+ def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
+ """
+ Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
+ polygon and the closest day of year.
+
+ For example:
+ given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32
+ search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc)
+
+ Valid matches:
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30
+
+ Invalid matches:
+ minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32
+ minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists
+
+ :param bounding_polygon: The exact bounding polygon of tiles to search for
+ :param ds: The dataset name being searched
+ :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned
+ :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
+ """
+
+ times = self.__ds[self.__time].to_numpy()
+
+ to_doy = lambda dt: datetime.utcfromtimestamp(int(dt)).timetuple().tm_yday
+
+ vfunc = np.vectorize(to_doy)
+ days_of_year = vfunc(times.astype(datetime) / 1e9)
+
+ try:
+ time = times[np.where(days_of_year <= day_of_year)[0][-1]].astype(datetime) / 1e9
+ except IndexError:
+ raise NexusTileServiceException(reason='No tiles matched')
+
+ min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds
+
+ return self.find_tiles_in_box(
+ min_lat, max_lat, min_lon, max_lon, ds, time, time
+ )
+
+ def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+ return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, dataset, time, time, **kwargs)
+
+ def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
+ return self.find_tiles_in_polygon(bounding_polygon, dataset, time, time, **kwargs)
+
+ def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
+ if type(start_time) is datetime:
+ start_time = (start_time - EPOCH).total_seconds()
+ if type(end_time) is datetime:
+ end_time = (end_time - EPOCH).total_seconds()
+
+ params = {
+ 'min_lat': min_lat,
+ 'max_lat': max_lat,
+ 'min_lon': min_lon,
+ 'max_lon': max_lon
+ }
+
+ times = None
+
+ if 0 <= start_time <= end_time:
+ if kwargs.get('distinct', True):
+ times_asc = self.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time)
+ times = [(t, t) for t in times_asc]
+ else:
+ times = [(start_time, end_time)]
+
+ if 'depth' in kwargs:
+ params['depth'] = kwargs['depth']
+ elif 'min_depth' in kwargs or 'max_depth' in kwargs:
+ params['min_depth'] = kwargs.get('min_depth')
+ params['max_depth'] = kwargs.get('max_depth')
+
+ if times:
+ return [ZarrBackend.__to_url(self._name, min_time=t[0], max_time=t[1], **params) for t in times]
+ else:
+ return [ZarrBackend.__to_url(self._name, **params)]
+
+ def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=None, end_time=None, **kwargs):
+ # Find tiles that fall within the polygon in the Solr index
+ bounds = bounding_polygon.bounds
+
+ min_lon = bounds[0]
+ min_lat = bounds[1]
+ max_lon = bounds[2]
+ max_lat = bounds[3]
+
+ return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs)
+
+ def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
+ """
+ Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
+ :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+ :param ds: The dataset name to search
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :return: A list of tiles
+ """
+ raise NotImplementedError()
+
+ def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
+ """
+ The method will return tiles with the exact given bounds within the time range. It differs from
+ find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to
+ doing a polygon intersection with the given bounds.
+
+ :param bounds: (minx, miny, maxx, maxy) bounds to search for
+ :param ds: Dataset name to search
+ :param start_time: Start time to search (seconds since epoch)
+ :param end_time: End time to search (seconds since epoch)
+ :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
+ :return:
+ """
+ min_lon = bounds[0]
+ min_lat = bounds[1]
+ max_lon = bounds[2]
+ max_lat = bounds[3]
+
+ return self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs)
+
+ def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+ # Due to the precise nature of gridded Zarr's subsetting, it doesn't make sense to have a boundary region like
+ # this
+ return []
+
+ def get_min_max_time_by_granule(self, ds, granule_name):
+ raise NotImplementedError()
+
+ def get_dataset_overall_stats(self, ds):
+ raise NotImplementedError()
+
+ def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
+ raise NotImplementedError()
+
+ def get_bounding_box(self, tile_ids):
+ """
+ Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids.
+ :param tile_ids: List of tile ids
+ :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles
+ """
+
+ bounds = [
+ (
+ float(URL(u).query['min_lon']),
+ float(URL(u).query['min_lat']),
+ float(URL(u).query['max_lon']),
+ float(URL(u).query['max_lat'])
+ )
+ for u in tile_ids
+ ]
+
+ poly = MultiPolygon([box(*b) for b in bounds])
+
+ return box(*poly.bounds)
+
+ def __get_ds_min_max_date(self):
+ min_date = self.__ds[self.__time].min().to_numpy()
+ max_date = self.__ds[self.__time].max().to_numpy()
+
+ if np.issubdtype(min_date.dtype, np.datetime64):
+ min_date = ((min_date - np.datetime64(EPOCH)) / 1e9).astype(int).item()
+
+ if np.issubdtype(max_date.dtype, np.datetime64):
+ max_date = ((max_date - np.datetime64(EPOCH)) / 1e9).astype(int).item()
+
+ return min_date, max_date
+
+ def get_min_time(self, tile_ids, ds=None):
+ """
+ Get the minimum tile date from the list of tile ids
+ :param tile_ids: List of tile ids
+ :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+ :return: long time in seconds since epoch
+ """
+ times = list(filter(lambda x: x is not None, [int(URL(tid).query['min_time']) for tid in tile_ids]))
+
+ if len(times) == 0:
+ min_date, max_date = self.__get_ds_min_max_date()
+ return min_date
+ else:
+ return min(times)
+
+ def get_max_time(self, tile_ids, ds=None):
+ """
+ Get the maximum tile date from the list of tile ids
+ :param tile_ids: List of tile ids
+ :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
+ :return: long time in seconds since epoch
+ """
+ times = list(filter(lambda x: x is not None, [int(URL(tid).query['max_time']) for tid in tile_ids]))
+
+ if len(tile_ids) == 0:
+ min_date, max_date = self.__get_ds_min_max_date()
+ return max_date
+ else:
+ return max(times)
+
+ def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
+ """
+ Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range.
+ :param bounding_polygon: The bounding polygon of tiles to search for
+ :param ds: The dataset name to search
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon
+ """
+ raise NotImplementedError()
+
+ def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
+ """
+ Return number of tiles that match search criteria.
+ :param ds: The dataset name to search
+ :param bounding_polygon: The polygon to search for tiles
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+ :return: number of tiles that match search criteria
+ """
+ raise NotImplementedError()
+
+ def fetch_data_for_tiles(self, *tiles):
+ for tile in tiles:
+ self.__fetch_data_for_tile(tile)
+
+ return tiles
+
+ def __fetch_data_for_tile(self, tile: Tile):
+ bbox: BBox = tile.bbox
+
+ min_lat = None
+ min_lon = None
+ max_lat = None
+ max_lon = None
+
+ min_time = tile.min_time
+ max_time = tile.max_time
+
+ # if min_time:
+ # min_time = datetime.utcfromtimestamp(min_time)
+ #
+ # if max_time:
+ # max_time = datetime.utcfromtimestamp(max_time)
+
+ if bbox:
+ min_lat = bbox.min_lat
+ min_lon = bbox.min_lon
+ max_lat = bbox.max_lat
+ max_lon = bbox.max_lon
+
+ sel_g = {
+ self.__latitude: slice(min_lat, max_lat),
+ self.__longitude: slice(min_lon, max_lon),
+ }
+
+ sel_t = {}
+
+ if min_time is None and max_time is None:
+ sel_t = None
+ method = None
+ elif min_time == max_time:
+ sel_t[self.__time] = [min_time] # List, otherwise self.__time dim will be dropped
+ method = 'nearest'
+ else:
+ sel_t[self.__time] = slice(min_time, max_time)
+ method = None
+
+ tile.variables = [
+ TileVariable(v, v) for v in self.__variables
+ ]
+
+ matched = self.__ds.sel(sel_g) #.sel(sel_t, method=method)
+
+ if sel_t is not None:
+ matched = matched.sel(sel_t, method=method)
+
+ tile.latitudes = ma.masked_invalid(matched[self.__latitude].to_numpy())
+ tile.longitudes = ma.masked_invalid(matched[self.__longitude].to_numpy())
+
+ times = matched[self.__time].to_numpy()
+
+ if np.issubdtype(times.dtype, np.datetime64):
+ times = ((times - np.datetime64(EPOCH)) / 1e9).astype(int)
+
+ tile.times = ma.masked_invalid(times)
+
+ var_data = [matched[var].to_numpy() for var in self.__variables]
+
+ if len(self.__variables) > 1:
+ tile.data = ma.masked_invalid(var_data)
+ tile.is_multi = True
+ else:
+ tile.data = ma.masked_invalid(var_data[0])
+ tile.is_multi = False
+
+
+ def _metadata_store_docs_to_tiles(self, *store_docs):
+ return [ZarrBackend.__nts_url_to_tile(d) for d in store_docs]
+
+ @staticmethod
+ def __nts_url_to_tile(nts_url):
+ tile = Tile()
+
+ url = URL(nts_url)
+
+ tile.tile_id = nts_url
+
+ try:
+ min_lat = float(url.query['min_lat'])
+ min_lon = float(url.query['min_lon'])
+ max_lat = float(url.query['max_lat'])
+ max_lon = float(url.query['max_lon'])
+
+ tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon)
+ except KeyError:
+ pass
+
+ tile.dataset = url.path
+ tile.dataset_id = url.path
+
+ try:
+ # tile.min_time = int(url.query['min_time'])
+ tile.min_time = datetime.utcfromtimestamp(int(url.query['min_time']))
+ except KeyError:
+ pass
+
+ try:
+ # tile.max_time = int(url.query['max_time'])
+ tile.max_time = datetime.utcfromtimestamp(int(url.query['max_time']))
+ except KeyError:
+ pass
+
+ tile.meta_data = {}
+
+ return tile
+
+ @staticmethod
+ def __to_url(dataset, **kwargs):
+ if 'dataset' in kwargs:
+ del kwargs['dataset']
+
+ if 'ds' in kwargs:
+ del kwargs['ds']
+
+ params = {}
+
+ # If any params are numpy dtypes, extract them to base python types
+ for kw in kwargs:
+ v = kwargs[kw]
+
+ if v is None:
+ continue
+
+ if isinstance(v, np.generic):
+ v = v.item()
+
+ params[kw] = v
+
+ return str(URL.build(
+ scheme='nts',
+ host='',
+ path=dataset,
+ query=params
+ ))
+
+
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/config/datasets.ini.default
similarity index 93%
copy from data-access/nexustiles/dao/__init__.py
copy to data-access/nexustiles/config/datasets.ini.default
index 6acb5d1..9f586cf 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/config/datasets.ini.default
@@ -12,3 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+[solr]
+host=http://localhost:8983
+core=nexusdatasets
diff --git a/data-access/nexustiles/dao/__init__.py b/data-access/nexustiles/exception.py
similarity index 86%
rename from data-access/nexustiles/dao/__init__.py
rename to data-access/nexustiles/exception.py
index 6acb5d1..d6ed2c6 100644
--- a/data-access/nexustiles/dao/__init__.py
+++ b/data-access/nexustiles/exception.py
@@ -12,3 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+class NexusTileServiceException(Exception):
+ def __init__(self, reason):
+ Exception.__init__(self, reason)
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index a3aa61e..b4fd6bb 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -14,25 +14,33 @@
# limitations under the License.
import configparser
+import json
import logging
import sys
-import json
+import threading
from datetime import datetime
-from functools import wraps, reduce
+from functools import reduce, wraps
+from time import sleep
+from typing import Dict, Union
import numpy as np
import numpy.ma as ma
import pkg_resources
+import pysolr
from pytz import timezone, UTC
-from shapely.geometry import MultiPolygon, box
+from shapely.geometry import box
+from webservice.webmodel import DatasetNotFoundException, NexusProcessingException
+from webservice.NexusHandler import nexus_initializer
+from yarl import URL
+
+from .AbstractTileService import AbstractTileService
+from .backends.nexusproto.backend import NexusprotoTileService
+from .backends.zarr.backend import ZarrBackend
+from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
-from .dao import CassandraProxy
-from .dao import DynamoProxy
-from .dao import S3Proxy
-from .dao import SolrProxy
-from .dao import ElasticsearchProxy
+from .exception import NexusTileServiceException
-from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
+from requests.structures import CaseInsensitiveDict
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -40,7 +48,7 @@ logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
-logger = logging.getLogger("testing")
+logger = logging.getLogger("nexus-tile-svc")
def tile_data(default_fetch=True):
@@ -50,13 +58,27 @@ def tile_data(default_fetch=True):
metadatastore_start = datetime.now()
metadatastore_docs = func(*args, **kwargs)
metadatastore_duration = (datetime.now() - metadatastore_start).total_seconds()
- tiles = args[0]._metadata_store_docs_to_tiles(*metadatastore_docs)
+
+ # Try to determine source dataset to route calls to proper backend
+ guessed_dataset = None
+
+ if 'ds' in kwargs:
+ guessed_dataset = kwargs['ds']
+ elif 'dataset' in kwargs:
+ guessed_dataset = kwargs['dataset']
+ else:
+ for arg in args:
+ if isinstance(arg, str) and arg in NexusTileService.backends:
+ guessed_dataset = arg
+ break
+
+ tiles = NexusTileService._get_backend(guessed_dataset)._metadata_store_docs_to_tiles(*metadatastore_docs)
cassandra_duration = 0
if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch):
if len(tiles) > 0:
cassandra_start = datetime.now()
- args[0].fetch_data_for_tiles(*tiles)
+ NexusTileService._get_backend(guessed_dataset).fetch_data_for_tiles(*tiles)
cassandra_duration += (datetime.now() - cassandra_start).total_seconds()
if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None:
@@ -74,38 +96,280 @@ def tile_data(default_fetch=True):
return tile_data_decorator
-class NexusTileServiceException(Exception):
- pass
+def catch_not_implemented(func):
+ def wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except NotImplementedError:
+ raise NexusTileServiceException('Action unsupported by backend')
+
+ return wrapper
-class NexusTileService(object):
- def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
- self._datastore = None
- self._metadatastore = None
+SOLR_LOCK = threading.Lock()
+DS_LOCK = threading.Lock()
+thread_local = threading.local()
+
+
+@nexus_initializer
+class NTSInitializer:
+ def __init__(self):
+ self._log = logger.getChild('init')
+
+ def init(self, config):
+ self._log.info('*** RUNNING NTS INITIALIZATION ***')
+ NexusTileService(config)
+
+
+class NexusTileService:
+ backends: Dict[Union[None, str], Dict[str, Union[AbstractTileService, bool]]] = {}
+
+ ds_config = None
+
+ __update_thread = None
+
+ @staticmethod
+ def __update_datasets_loop():
+ while True:
+ with DS_LOCK:
+ NexusTileService._update_datasets()
+ sleep(3600)
+ def __init__(self, config=None):
self._config = configparser.RawConfigParser()
- self._config.read(NexusTileService._get_config_files('config/datastores.ini'))
+ self._config.read(NexusTileService._get_config_files('config/datasets.ini'))
+
+ self._alg_config = config
+
+ if not NexusTileService.backends:
+ NexusTileService.ds_config = configparser.RawConfigParser()
+ NexusTileService.ds_config.read(NexusTileService._get_config_files('config/datasets.ini'))
+
+ default_backend = {"backend": NexusprotoTileService(False, False, config), 'up': True}
+
+ NexusTileService.backends[None] = default_backend
+ NexusTileService.backends['__nexusproto__'] = default_backend
if config:
self.override_config(config)
- if not skipDatastore:
- datastore = self._config.get("datastore", "store")
- if datastore == "cassandra":
- self._datastore = CassandraProxy.CassandraProxy(self._config)
- elif datastore == "s3":
- self._datastore = S3Proxy.S3Proxy(self._config)
- elif datastore == "dynamo":
- self._datastore = DynamoProxy.DynamoProxy(self._config)
+ if not NexusTileService.__update_thread:
+ NexusTileService.__update_thread = threading.Thread(
+ target=NexusTileService.__update_datasets_loop,
+ name='dataset_update',
+ daemon=True
+ )
+
+ logger.info('Starting dataset refresh thread')
+
+ NexusTileService.__update_thread.start()
+
+ @staticmethod
+ def _get_backend(dataset_s) -> AbstractTileService:
+ if dataset_s is not None:
+ dataset_s = dataset_s
+
+ with DS_LOCK:
+ if dataset_s not in NexusTileService.backends:
+ logger.warning(f'Dataset {dataset_s} not currently loaded. Checking to see if it was recently'
+ f'added')
+ NexusTileService._update_datasets()
+ if dataset_s not in NexusTileService.backends:
+ raise DatasetNotFoundException(reason=f'Dataset {dataset_s} is not currently loaded/ingested')
+
+ b = NexusTileService.backends[dataset_s]
+
+ # if not b['up']:
+ # success = b['backend'].try_connect()
+ #
+ # if not success:
+ # raise NexusProcessingException(reason=f'Dataset {dataset_s} is currently unavailable')
+ # else:
+ # NexusTileService.backends[dataset_s]['up'] = True
+
+ return b['backend']
+
+
+ @staticmethod
+ def _get_datasets_store():
+ solr_url = NexusTileService.ds_config.get("solr", "host")
+ solr_core = NexusTileService.ds_config.get("solr", "core")
+ solr_kwargs = {}
+
+ if NexusTileService.ds_config.has_option("solr", "time_out"):
+ solr_kwargs["timeout"] = NexusTileService.ds_config.get("solr", "time_out")
+
+ with SOLR_LOCK:
+ solrcon = getattr(thread_local, 'solrcon', None)
+ if solrcon is None:
+ solr_url = '%s/solr/%s' % (solr_url, solr_core)
+ solrcon = pysolr.Solr(solr_url, **solr_kwargs)
+ thread_local.solrcon = solrcon
+
+ solrcon = solrcon
+
+ return solrcon
+
+ @staticmethod
+ def _update_datasets():
+ update_logger = logging.getLogger("nexus-tile-svc.backends")
+ solrcon = NexusTileService._get_datasets_store()
+
+ update_logger.info('Executing Solr query to check for new datasets')
+
+ present_datasets = {None, '__nexusproto__'}
+ next_cursor_mark = '*'
+
+ added_datasets = 0
+
+ while True:
+ response = solrcon.search('*:*', cursorMark=next_cursor_mark, sort='id asc')
+
+ try:
+ response_cursor_mark = response.nextCursorMark
+ except AttributeError:
+ break
+
+ if response_cursor_mark == next_cursor_mark:
+ break
else:
- raise ValueError("Error reading datastore from config file")
+ next_cursor_mark = response_cursor_mark
+
+ for dataset in response.docs:
+ d_id = dataset['dataset_s']
+ store_type = dataset.get('store_type_s', 'nexusproto')
+
+ present_datasets.add(d_id)
+
+ if d_id in NexusTileService.backends:
+ continue
+ # is_up = NexusTileService.backends[d_id]['backend'].try_connect()
+
+ added_datasets += 1
+
+ if store_type == 'nexus_proto' or store_type == 'nexusproto':
+ update_logger.info(f"Detected new nexusproto dataset {d_id}, using default nexusproto backend")
+ NexusTileService.backends[d_id] = NexusTileService.backends[None]
+ elif store_type == 'zarr':
+ update_logger.info(f"Detected new zarr dataset {d_id}, opening new zarr backend")
+
+ ds_config = json.loads(dataset['config'][0])
+ try:
+ NexusTileService.backends[d_id] = {
+ 'backend': ZarrBackend(dataset_name=dataset['dataset_s'], **ds_config),
+ 'up': True
+ }
+ except NexusTileServiceException:
+ added_datasets -= 1
+ else:
+ update_logger.warning(f'Unsupported backend {store_type} for dataset {d_id}')
+ added_datasets -= 1
+
+ removed_datasets = set(NexusTileService.backends.keys()).difference(present_datasets)
+
+ if len(removed_datasets) > 0:
+ update_logger.info(f'{len(removed_datasets)} old datasets marked for removal')
+
+ for dataset in removed_datasets:
+ update_logger.info(f"Removing dataset {dataset}")
+ del NexusTileService.backends[dataset]
+
+ update_logger.info(f'Finished dataset update: {added_datasets} added, {len(removed_datasets)} removed, '
+ f'{len(NexusTileService.backends) - 2} total')
+
+ # Update cfg (ie, creds) of dataset
+ @staticmethod
+ def user_ds_update(name, config):
+ solr = NexusTileService._get_datasets_store()
+
+ docs = solr.search(f'dataset_s:{name}').docs
+
+ if len(docs) != 1:
+ raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}')
+
+ ds = docs[0]
+
+ if 'source_s' not in ds or ds['source_s'] == 'collection_config':
+ raise ValueError('Provided dataset is source_s in collection config and cannot be deleted')
+
+ config_dict = json.loads(ds['config'][0])
+
+ config_dict['config'] = config
+
+ solr.delete(id=ds['id'])
+ solr.add([{
+ 'id': name,
+ 'dataset_s': name,
+ 'latest_update_l': int(datetime.now().timestamp()),
+ 'store_type_s': ds['store_type_s'],
+ 'config': json.dumps(config_dict),
+ 'source_s': 'user_added'
+ }])
+ solr.commit()
+
+ logger.info(f'Updated dataset {name} in Solr. Updating backends')
+
+ with DS_LOCK:
+ NexusTileService._update_datasets()
+
+ return {'success': True}
+
+ # Add dataset + backend
+ @staticmethod
+ def user_ds_add(name, path, config, type='zarr'):
+ solr = NexusTileService._get_datasets_store()
+
+ docs = solr.search(f'dataset_s:{name}').docs
+
+ if len(docs) > 0:
+ raise ValueError(f'Dataset {name} already exists')
+
+ config_dict = {
+ 'path': path,
+ 'config': config
+ }
+
+ solr.add([{
+ 'id': name,
+ 'dataset_s': name,
+ 'latest_update_l': int(datetime.now().timestamp()),
+ 'store_type_s': type,
+ 'config': json.dumps(config_dict),
+ 'source_s': 'user_added'
+ }])
+ solr.commit()
+
+ logger.info(f'Added dataset {name} to Solr. Updating backends')
+
+ with DS_LOCK:
+ NexusTileService._update_datasets()
+
+ return {'success': True}
- if not skipMetadatastore:
- metadatastore = self._config.get("metadatastore", "store", fallback='solr')
- if metadatastore == "solr":
- self._metadatastore = SolrProxy.SolrProxy(self._config)
- elif metadatastore == "elasticsearch":
- self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config)
+ # Delete dataset backend (error if it's a hardcoded one)
+ @staticmethod
+ def user_ds_delete(name):
+ solr = NexusTileService._get_datasets_store()
+
+ docs = solr.search(f'dataset_s:{name}').docs
+
+ if len(docs) != 1:
+ raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}')
+
+ ds = docs[0]
+
+ if 'source_s' not in ds or ds['source_s'] == 'collection_config':
+ raise ValueError('Provided dataset is source_s in collection config and cannot be deleted')
+
+ solr.delete(id=ds['id'])
+ solr.commit()
+
+ logger.info(f'Removed dataset {name} from Solr. Updating backends')
+
+ with DS_LOCK:
+ NexusTileService._update_datasets()
+
+ return {'success': True}
def override_config(self, config):
for section in config.sections():
@@ -113,109 +377,90 @@ class NexusTileService(object):
for option in config.options(section):
if config.get(section, option) is not None:
self._config.set(section, option, config.get(section, option))
+ if NexusTileService.ds_config.has_section(section): # only override preexisting section, ignores the other
+ for option in config.options(section):
+ if config.get(section, option) is not None:
+ NexusTileService.ds_config.set(section, option, config.get(section, option))
def get_dataseries_list(self, simple=False):
- if simple:
- return self._metadatastore.get_data_series_list_simple()
- else:
- return self._metadatastore.get_data_series_list()
+ datasets = []
+ for backend in set([b['backend'] for b in NexusTileService.backends.values() if b['up']]):
+ datasets.extend(backend.get_dataseries_list(simple))
+
+ return datasets
+
@tile_data()
+ @catch_not_implemented
def find_tile_by_id(self, tile_id, **kwargs):
- return self._metadatastore.find_tile_by_id(tile_id)
+ tile = URL(tile_id)
+
+ if tile.scheme == 'nts':
+ return NexusTileService._get_backend(tile.path).find_tile_by_id(tile_id)
+ else:
+ return NexusTileService._get_backend('__nexusproto__').find_tile_by_id(tile_id)
@tile_data()
+ @catch_not_implemented
def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
- return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
+ if ds is None:
+ return [self.find_tile_by_id(tid, **kwargs, fetch_data=False) for tid in tile_ids]
+ return NexusTileService._get_backend(ds).find_tiles_by_id(tile_ids, ds=ds, **kwargs)
+ @catch_not_implemented
def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
metrics_callback=None, **kwargs):
- start = datetime.now()
- result = self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time,
- end_time,
- **kwargs)
- duration = (datetime.now() - start).total_seconds()
- if metrics_callback:
- metrics_callback(solr=duration)
- return result
+ return NexusTileService._get_backend(dataset).find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon,
+ dataset, start_time, end_time,
+ metrics_callback, **kwargs)
@tile_data()
+ @catch_not_implemented
def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs):
- """
- Given a bounding polygon, dataset, and day of year, find tiles in that dataset with the same bounding
- polygon and the closest day of year.
-
- For example:
- given a polygon minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; and day of year=32
- search for first tile in MY_DS with identical bbox and day_of_year <= 32 (sorted by day_of_year desc)
-
- Valid matches:
- minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32
- minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30
-
- Invalid matches:
- minx=1, miny=0, maxx=2, maxy=1; dataset=MY_DS; day of year = 32
- minx=0, miny=0, maxx=1, maxy=1; dataset=MY_OTHER_DS; day of year = 32
- minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 30 if minx=0, miny=0, maxx=1, maxy=1; dataset=MY_DS; day of year = 32 also exists
-
- :param bounding_polygon: The exact bounding polygon of tiles to search for
- :param ds: The dataset name being searched
- :param day_of_year: Tile day of year to search for, tile nearest to this day (without going over) will be returned
- :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
- """
- try:
- tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds,
- day_of_year)
- except IndexError:
- raise NexusTileServiceException("No tile found.").with_traceback(sys.exc_info()[2])
-
- return tile
+ return NexusTileService._get_backend(ds).find_tile_by_polygon_and_most_recent_day_of_year(
+ bounding_polygon, ds, day_of_year, **kwargs
+ )
@tile_data()
+ @catch_not_implemented
def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
- return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
- rows=5000,
- **kwargs)
+ return NexusTileService._get_backend(dataset).find_all_tiles_in_box_at_time(
+ min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs
+ )
@tile_data()
+ @catch_not_implemented
def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
- return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
- **kwargs)
+ return NexusTileService._get_backend(dataset).find_all_tiles_in_polygon_at_time(
+ bounding_polygon, dataset, time, **kwargs
+ )
@tile_data()
+ @catch_not_implemented
def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
# Find tiles that fall in the given box in the Solr index
if type(start_time) is datetime:
start_time = (start_time - EPOCH).total_seconds()
if type(end_time) is datetime:
end_time = (end_time - EPOCH).total_seconds()
- return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
- end_time, **kwargs)
+
+ return NexusTileService._get_backend(ds).find_tiles_in_box(
+ min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs
+ )
@tile_data()
+ @catch_not_implemented
def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
- # Find tiles that fall within the polygon in the Solr index
- if 'sort' in list(kwargs.keys()):
- tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs)
- else:
- tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time,
- end_time,
- **kwargs)
- return tiles
+ return NexusTileService._get_backend(ds).find_tiles_in_polygon(
+ bounding_polygon, ds, start_time, end_time, **kwargs
+ )
@tile_data()
+ @catch_not_implemented
def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
- """
- Return list of tiles whose metadata matches the specified metadata, start_time, end_time.
- :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
- :param ds: The dataset name to search
- :param start_time: The start time to search for tiles
- :param end_time: The end time to search for tiles
- :return: A list of tiles
- """
- tiles = self._metadatastore.find_all_tiles_by_metadata(metadata, ds, start_time, end_time, **kwargs)
-
- return tiles
+ return NexusTileService._get_backend(ds).find_tiles_by_metadata(
+ metadata, ds, start_time, end_time, **kwargs
+ )
def get_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs):
"""
@@ -233,6 +478,7 @@ class NexusTileService(object):
return tiles
@tile_data()
+ @catch_not_implemented
def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs):
"""
The method will return tiles with the exact given bounds within the time range. It differs from
@@ -246,16 +492,16 @@ class NexusTileService(object):
:param kwargs: fetch_data: True/False = whether or not to retrieve tile data
:return:
"""
- tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds,
- start_time,
- end_time)
- return tiles
+ return NexusTileService._get_backend(ds).find_tiles_by_exact_bounds(
+ bounds, ds, start_time, end_time, **kwargs
+ )
@tile_data()
+ @catch_not_implemented
def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
- return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
- rows=5000,
- **kwargs)
+ return NexusTileService._get_backend(dataset).find_all_boundary_tiles_at_time(
+ min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs
+ )
def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1,
**kwargs):
@@ -275,13 +521,15 @@ class NexusTileService(object):
return tiles
+ @catch_not_implemented
def get_min_max_time_by_granule(self, ds, granule_name):
- start_time, end_time = self._metadatastore.find_min_max_date_from_granule(ds, granule_name)
-
- return start_time, end_time
+ return NexusTileService._get_backend(ds).get_min_max_time_by_granule(
+ ds, granule_name
+ )
+ @catch_not_implemented
def get_dataset_overall_stats(self, ds):
- return self._metadatastore.get_data_series_stats(ds)
+ return NexusTileService._get_backend(ds).get_dataset_overall_stats(ds)
def get_tiles_bounded_by_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs)
@@ -301,24 +549,19 @@ class NexusTileService(object):
return tiles
+ @catch_not_implemented
def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
- tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
- **kwargs)
-
- return tiles
+ return NexusTileService.get_stats_within_box_at_time(
+ min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs
+ )
- def get_bounding_box(self, tile_ids):
+ def get_bounding_box(self, tile_ids, ds=None):
"""
Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids.
:param tile_ids: List of tile ids
:return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles
"""
- tiles = self.find_tiles_by_id(tile_ids, fl=['tile_min_lat', 'tile_max_lat', 'tile_min_lon', 'tile_max_lon'],
- fetch_data=False, rows=len(tile_ids))
- polys = []
- for tile in tiles:
- polys.append(box(tile.bbox.min_lon, tile.bbox.min_lat, tile.bbox.max_lon, tile.bbox.max_lat))
- return box(*MultiPolygon(polys).bounds)
+ return NexusTileService._get_backend(ds).get_bounding_box(tile_ids)
def get_min_time(self, tile_ids, ds=None):
"""
@@ -327,8 +570,7 @@ class NexusTileService(object):
:param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
:return: long time in seconds since epoch
"""
- min_time = self._metadatastore.find_min_date_from_tiles(tile_ids, ds=ds)
- return int((min_time - EPOCH).total_seconds())
+ return NexusTileService._get_backend(ds).get_min_time(tile_ids, ds)
def get_max_time(self, tile_ids, ds=None):
"""
@@ -337,8 +579,7 @@ class NexusTileService(object):
:param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
:return: long time in seconds since epoch
"""
- max_time = self._metadatastore.find_max_date_from_tiles(tile_ids, ds=ds)
- return int((max_time - EPOCH).total_seconds())
+ return int(NexusTileService._get_backend(ds).get_max_time(tile_ids))
def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
"""
@@ -352,8 +593,19 @@ class NexusTileService(object):
bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time)
return [box(*b) for b in bounds]
- def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles):
+ def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
+ """
+ Return number of tiles that match search criteria.
+ :param ds: The dataset name to search
+ :param bounding_polygon: The polygon to search for tiles
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
+ :return: number of tiles that match search criteria
+ """
+ return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs)
+ def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles):
for tile in tiles:
tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat)
tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon)
@@ -438,45 +690,12 @@ class NexusTileService(object):
return tiles
- def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs):
- """
- Return number of tiles that match search criteria.
- :param ds: The dataset name to search
- :param bounding_polygon: The polygon to search for tiles
- :param start_time: The start time to search for tiles
- :param end_time: The end time to search for tiles
- :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"]
- :return: number of tiles that match search criteria
- """
- return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs)
-
def fetch_data_for_tiles(self, *tiles):
+ dataset = tiles[0].dataset
- nexus_tile_ids = set([tile.tile_id for tile in tiles])
- matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids)
-
- tile_data_by_id = {str(a_tile_data.tile_id): a_tile_data for a_tile_data in matched_tile_data}
-
- missing_data = nexus_tile_ids.difference(list(tile_data_by_id.keys()))
- if len(missing_data) > 0:
- raise Exception("Missing data for tile_id(s) %s." % missing_data)
-
- for a_tile in tiles:
- lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta()
-
- a_tile.latitudes = lats
- a_tile.longitudes = lons
- a_tile.times = times
- a_tile.data = data
- a_tile.meta_data = meta
- a_tile.is_multi = is_multi_var
-
- del (tile_data_by_id[a_tile.tile_id])
-
- return tiles
+ return NexusTileService._get_backend(dataset).fetch_data_for_tiles(*tiles)
def _metadata_store_docs_to_tiles(self, *store_docs):
-
tiles = []
for store_doc in store_docs:
tile = Tile()
@@ -573,7 +792,6 @@ class NexusTileService(object):
except KeyError:
pass
-
if 'tile_var_name_ss' in store_doc:
tile.variables = []
for var_name in store_doc['tile_var_name_ss']:
@@ -588,13 +806,6 @@ class NexusTileService(object):
return tiles
- def pingSolr(self):
- status = self._metadatastore.ping()
- if status and status["status"] == "OK":
- return True
- else:
- return False
-
@staticmethod
def _get_config_files(filename):
log = logging.getLogger(__name__)
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index f91f180..05b7894 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -21,4 +21,11 @@ urllib3==1.26.2
requests
nexusproto
Shapely
-numpy==1.24.3
+s3fs==2022.5.0
+fsspec==2022.5.0
+botocore==1.24.21
+aiohttp==3.8.1
+xarray~=2022.3.0
+zarr>=2.11.3
+pandas<2.1.0rc0 # Temporary restriction because 2.1.0rc0 fails to build
+
diff --git a/data-access/setup.py b/data-access/setup.py
index ab0248f..e539e1e 100644
--- a/data-access/setup.py
+++ b/data-access/setup.py
@@ -12,11 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import setuptools
from setuptools import setup
-with open('../VERSION.txt', 'r') as f:
- __version__ = f.read()
+try:
+ with open('../VERSION.txt', 'r') as f:
+ __version__ = f.read()
+except:
+ __version__ = None
with open('requirements.txt') as f:
@@ -32,8 +35,13 @@ setup(
description="NEXUS API.",
long_description=open('README.md').read(),
- packages=['nexustiles', 'nexustiles.model', 'nexustiles.dao'],
- package_data={'nexustiles': ['config/datastores.ini.default', 'config/datastores.ini']},
+ packages=setuptools.find_packages(), # ['nexustiles', 'nexustiles.model', 'nexustiles.dao'],
+ package_data={
+ 'nexustiles':
+ ['config/datasets.ini.default', 'config/datasets.ini'],
+ 'nexustiles.backends.nexusproto':
+ ['config/datastores.ini.default', 'config/datastores.ini']
+ },
platforms='any',
python_requires='~=3.8',
install_requires=pip_requirements,