You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2021/09/29 16:19:26 UTC
[incubator-sdap-nexus] branch master updated: SDAP-338: Update
match up implementation to support multi-variable tiles (#132)
This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new dc0f0d1 SDAP-338: Update match up implementation to support multi-variable tiles (#132)
dc0f0d1 is described below
commit dc0f0d17152c48b499561273ef84086e31dd2677
Author: Stepheny Perez <sk...@users.noreply.github.com>
AuthorDate: Wed Sep 29 09:19:19 2021 -0700
SDAP-338: Update match up implementation to support multi-variable tiles (#132)
* Added support for multi-variable swath tiles
* Converted Tile to dataclass
* Added matchup support for grid_multi_variable_tile
* Updated matchup to work with existing unit tests
* Update data point dict names to variable_name and variable_value
* Fixed lint warnings
* Added test case for multi-var sat to multi-var sat
* Added cf_variable_name to matchup response
* get_indices will combine masks of all variables
* Only add data point if valid
* Updated matchup to work with new multi-variable solr doc layout
* Backwards compatability for loading solr doc into tile
* Improved backwards compatability when loading solr doc into Tile. Works when standard name field isn't present
---
analysis/tests/algorithms_spark/test_matchup.py | 319 ++++++++++++++++++------
analysis/webservice/algorithms_spark/Matchup.py | 98 ++++++--
data-access/nexustiles/dao/CassandraProxy.py | 123 ++++++++-
data-access/nexustiles/model/nexusmodel.py | 94 +++++--
data-access/nexustiles/nexustiles.py | 56 ++++-
5 files changed, 570 insertions(+), 120 deletions(-)
diff --git a/analysis/tests/algorithms_spark/test_matchup.py b/analysis/tests/algorithms_spark/test_matchup.py
index 1fcb5ce..f0f9c0d 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -23,11 +23,16 @@ import mock
import numpy as np
import pytest
import webservice.algorithms_spark.Matchup as matchup
-from nexustiles.model.nexusmodel import Tile
+from nexustiles.model.nexusmodel import Tile, TileVariable
from pyspark.sql import SparkSession
from shapely import wkt
from shapely.geometry import box
-from webservice.algorithms_spark.Matchup import DomsPoint, Matchup
+from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint
+
+
+class MockSparkParam:
+ def __init__(self, value):
+ self.value = value
@pytest.fixture(scope='function')
@@ -37,6 +42,60 @@ def test_dir():
yield test_data_dir
+@pytest.fixture(scope='function')
+def test_tile():
+ yield Tile(
+ tile_id='test-tile',
+ bbox='',
+ dataset='test-dataset',
+ dataset_id='test-dataset',
+ granule='test-granule',
+ min_time='2020-07-28T00:00:00',
+ max_time='2020-07-28T00:00:00',
+ section_spec='2020-07-28T00:00:00',
+ meta_data={},
+ is_multi=True
+ )
+
+
+@pytest.fixture(scope='function')
+def test_matchup_args():
+ tile_ids = [1]
+ polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
+ primary_ds_name = 'primary-ds-name'
+ matchup_ds_names = 'test'
+ parameter = 'sst'
+ depth_min = 0.0
+ depth_max = 1.0
+ time_tolerance = 3.0
+ radius_tolerance = 1000000.0
+ platforms = '1,2,3,4,5,6,7,8,9'
+
+ yield dict(
+ tile_ids=tile_ids,
+ primary_b=MockSparkParam(primary_ds_name),
+ matchup_b=MockSparkParam(matchup_ds_names),
+ parameter_b=MockSparkParam(parameter),
+ tt_b=MockSparkParam(time_tolerance),
+ rt_b=MockSparkParam(radius_tolerance),
+ platforms_b=MockSparkParam(platforms),
+ bounding_wkt_b=MockSparkParam(polygon_wkt),
+ depth_min_b=MockSparkParam(depth_min),
+ depth_max_b=MockSparkParam(depth_max)
+ )
+
+
+def setup_mock_tile_service(tile):
+ tile_service_factory = mock.MagicMock()
+ tile_service = mock.MagicMock()
+ tile_service_factory.return_value = tile_service
+ tile_service.get_bounding_box.return_value = box(-90, -45, 90, 45)
+ tile_service.get_min_time.return_value = 1627490285
+ tile_service.get_max_time.return_value = 1627490285
+ tile_service.mask_tiles_to_polygon.return_value = [tile]
+ return tile_service_factory
+
+
def test_doms_point_is_pickleable():
edge_point = {
'id': 'argo-profiles-5903995(46, 0)',
@@ -55,7 +114,7 @@ def test_doms_point_is_pickleable():
assert pickle.dumps(point) is not None
-def test_calc():
+def test_calc(test_matchup_args):
"""
Assert that the expected functions are called during the matchup
calculation and that the results are formatted as expected.
@@ -115,15 +174,22 @@ def test_calc():
d1_ins = DomsPoint(**doms_point_args)
d2_ins = DomsPoint(**doms_point_args)
- d1_sat.satellite_var_name = 'sea_surface_temperature'
- d2_sat.satellite_var_name = 'sea_surface_temperature'
- d1_ins.satellite_var_name = 'sea_surface_temperature'
- d2_ins.satellite_var_name = 'sea_surface_temperature'
-
- d1_sat.satellite_var_value = 10.0
- d2_sat.satellite_var_value = 20.0
- d1_ins.satellite_var_value = 30.0
- d2_ins.satellite_var_value = 40.0
+ d1_sat.data = [DataPoint(
+ variable_name='sea_surface_temperature',
+ variable_value=10.0
+ )]
+ d2_sat.data = [DataPoint(
+ variable_name='sea_surface_temperature',
+ variable_value=20.0
+ )]
+ d1_ins.data = [DataPoint(
+ variable_name='sea_surface_temperature',
+ variable_value=30.0
+ )]
+ d2_ins.data = [DataPoint(
+ variable_name='sea_surface_temperature',
+ variable_value=40.0
+ )]
fake_spark_result = {
d1_sat: [d1_ins, d2_ins],
@@ -172,18 +238,18 @@ def test_calc():
assert matches['x'] == '-180'
assert matches['y'] == '-90'
- assert json_matchup_result['data'][0]['sea_surface_temperature'] == 10.0
- assert json_matchup_result['data'][1]['sea_surface_temperature'] == 20.0
- assert json_matchup_result['data'][0]['matches'][0]['sea_surface_temperature'] == 30.0
- assert json_matchup_result['data'][0]['matches'][1]['sea_surface_temperature'] == 40.0
- assert json_matchup_result['data'][1]['matches'][0]['sea_surface_temperature'] == 30.0
- assert json_matchup_result['data'][1]['matches'][1]['sea_surface_temperature'] == 40.0
+ assert json_matchup_result['data'][0]['data'][0]['variable_value'] == 10.0
+ assert json_matchup_result['data'][1]['data'][0]['variable_value'] == 20.0
+ assert json_matchup_result['data'][0]['matches'][0]['data'][0]['variable_value'] == 30.0
+ assert json_matchup_result['data'][0]['matches'][1]['data'][0]['variable_value'] == 40.0
+ assert json_matchup_result['data'][1]['matches'][0]['data'][0]['variable_value'] == 30.0
+ assert json_matchup_result['data'][1]['matches'][1]['data'][0]['variable_value'] == 40.0
assert json_matchup_result['details']['numInSituMatched'] == 4
assert json_matchup_result['details']['numGriddedMatched'] == 2
-def test_match_satellite_to_insitu(test_dir):
+def test_match_satellite_to_insitu(test_dir, test_tile, test_matchup_args):
"""
Test the test_match_satellite_to_insitu and ensure the matchup is
done as expected, where the tile points and in-situ points are all
@@ -216,33 +282,13 @@ def test_match_satellite_to_insitu(test_dir):
with secondary point (5, 15) and primary point (20, 0) should match
with (18, 3)
"""
- tile = Tile()
- tile.tile_id = 1
- tile.tile_min_lat = 0
- tile.tile_max_lat = 20
- tile.tile_min_lon = 0
- tile.tile_max_lon = 20
- tile.dataset = 'test-dataset'
- tile.dataset_id = 123
- tile.granule = 'test-granule-name'
- tile.min_time = '2020-07-28T00:00:00'
- tile.max_time = '2020-07-28T00:00:00'
- tile.section_spec = 'test-section-spec'
- tile.var_name = 'sea_surface_temperature'
- tile.latitudes = np.array([0, 20], dtype=np.float32)
- tile.longitudes = np.array([0, 20], dtype=np.float32)
- tile.times = [1627490285]
- tile.data = np.array([[[11.0, 21.0], [31.0, 41.0]]])
- tile.get_indices = lambda: [[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1]]
- tile.meta_data = {'wind_type': []}
-
- tile_service_factory = mock.MagicMock()
- tile_service = mock.MagicMock()
- tile_service_factory.return_value = tile_service
- tile_service.get_bounding_box.return_value = box(-90, -45, 90, 45)
- tile_service.get_min_time.return_value = 1627490285
- tile_service.get_max_time.return_value = 1627490285
- tile_service.mask_tiles_to_polygon.return_value = [tile]
+ test_tile.variables = [TileVariable('sst', 'sea_surface_temperature')]
+ test_tile.latitudes = np.array([0, 20], dtype=np.float32)
+ test_tile.longitudes = np.array([0, 20], dtype=np.float32)
+ test_tile.times = [1627490285]
+ test_tile.data = np.array([[[11.0, 21.0], [31.0, 41.0]]])
+ test_tile.get_indices = lambda: [[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1]]
+ test_tile.is_multi = False
tile_ids = [1]
polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
@@ -255,11 +301,9 @@ def test_match_satellite_to_insitu(test_dir):
radius_tolerance = 1000000.0
platforms = '1,2,3,4,5,6,7,8,9'
- class MockSparkParam:
- def __init__(self, value):
- self.value = value
-
- with mock.patch('webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName') as mock_edge_endpoints:
+ with mock.patch(
+ 'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+ ) as mock_edge_endpoints:
# Test the satellite->insitu branch
# By mocking the getEndpointsByName function we are forcing
# Matchup to think this dummy matchup dataset is an insitu
@@ -279,7 +323,7 @@ def test_match_satellite_to_insitu(test_dir):
bounding_wkt_b=MockSparkParam(polygon_wkt),
depth_min_b=MockSparkParam(depth_min),
depth_max_b=MockSparkParam(depth_max),
- tile_service_factory=tile_service_factory
+ tile_service_factory=setup_mock_tile_service(test_tile)
)
generator = matchup.match_satellite_to_insitu(**match_args)
@@ -299,21 +343,31 @@ def test_match_satellite_to_insitu(test_dir):
# Each primary point matched with 1 matchup point
assert len(matchup_result[0]) == 2
assert len(matchup_result[1]) == 2
- # Check that the satellite point was matched to the expected insitu point
+ # Check that the satellite point was matched to the expected secondary point
assert matchup_result[0][1].latitude == 3.0
assert matchup_result[0][1].longitude == 18.0
assert matchup_result[1][1].latitude == 15.0
assert matchup_result[1][1].longitude == 5.0
- # Check that the insitu points have the expected values
+ # Check that the secondary points have the expected values
if insitu_matchup:
- assert matchup_result[0][1].sst == 30.0
- assert matchup_result[1][1].sst == 10.0
+ assert matchup_result[0][1].data[0].variable_value == 30.0
+ assert matchup_result[1][1].data[0].variable_value == 10.0
+ assert matchup_result[0][1].data[0].variable_name == 'sea_water_temperature'
+ assert matchup_result[1][1].data[0].variable_name == 'sea_water_temperature'
else:
- assert matchup_result[0][1].satellite_var_value == 30.0
- assert matchup_result[1][1].satellite_var_value == 10.0
+ assert matchup_result[0][1].data[0].variable_value == 30.0
+ assert matchup_result[1][1].data[0].variable_value == 10.0
+ assert matchup_result[0][1].data[0].variable_name == 'sst'
+ assert matchup_result[0][1].data[0].cf_variable_name == 'sea_surface_temperature'
+ assert matchup_result[1][1].data[0].variable_name == 'sst'
+ assert matchup_result[1][1].data[0].cf_variable_name == 'sea_surface_temperature'
# Check that the satellite points have the expected values
- assert matchup_result[0][0].satellite_var_value == 21.0
- assert matchup_result[1][0].satellite_var_value == 31.0
+ assert matchup_result[0][0].data[0].variable_value == 21.0
+ assert matchup_result[1][0].data[0].variable_value == 31.0
+ assert matchup_result[0][0].data[0].variable_name == 'sst'
+ assert matchup_result[0][0].data[0].cf_variable_name == 'sea_surface_temperature'
+ assert matchup_result[1][0].data[0].variable_name == 'sst'
+ assert matchup_result[1][0].data[0].cf_variable_name == 'sea_surface_temperature'
insitu_matchup_result = list(generator)
validate_matchup_result(insitu_matchup_result, insitu_matchup=True)
@@ -330,28 +384,145 @@ def test_match_satellite_to_insitu(test_dir):
points = [wkt.loads(result['point']) for result in edge_json['results']]
matchup_tile = Tile()
- matchup_tile.tile_id = 1
- matchup_tile.tile_min_lat = 3
- matchup_tile.tile_max_lat = 15
- matchup_tile.tile_min_lon = 5
- matchup_tile.tile_max_lon = 18
- matchup_tile.dataset = 'test-dataset'
- matchup_tile.dataset_id = 123
- matchup_tile.granule = 'test-granule-name'
- matchup_tile.min_time = '2020-07-28T00:00:00'
- matchup_tile.max_time = '2020-07-28T00:00:00'
- matchup_tile.section_spec = 'test-section-spec'
- matchup_tile.var_name = 'sea_surface_temperature'
+ matchup_tile.variables = [TileVariable('sst', 'sea_surface_temperature')]
matchup_tile.latitudes = np.array([point.y for point in points], dtype=np.float32)
matchup_tile.longitudes = np.array([point.x for point in points], dtype=np.float32)
matchup_tile.times = [edge_json['results'][0]['time']]
matchup_tile.data = np.array([[[10.0, 0, 0], [0, 20.0, 0], [0, 0, 30.0]]])
matchup_tile.get_indices = lambda: [[0, 0, 0], [0, 1, 1], [0, 2, 2]]
- matchup_tile.meta_data = {'wind_type': []}
+ matchup_tile.is_multi = False
- tile_service.find_tiles_in_polygon.return_value = [matchup_tile]
+ match_args['tile_service_factory']().find_tiles_in_polygon.return_value = [matchup_tile]
generator = matchup.match_satellite_to_insitu(**match_args)
sat_matchup_result = list(generator)
validate_matchup_result(sat_matchup_result, insitu_matchup=False)
+
+
+def test_multi_variable_matchup(test_dir, test_tile, test_matchup_args):
+ """
+ Test multi-variable satellite to in-situ matchup functionality.
+ """
+ test_tile.latitudes = np.array([0, 20], dtype=np.float32)
+ test_tile.longitudes = np.array([0, 20], dtype=np.float32)
+ test_tile.times = [1627490285]
+ test_tile.data = np.array([
+ [[
+ [1.10, 2.10],
+ [3.10, 4.10]
+ ]],
+ [[
+ [11.0, 21.0],
+ [31.0, 41.0]
+ ]]
+ ])
+ test_tile.is_multi = True
+ test_tile.variables = [
+ TileVariable('wind_speed', 'wind_speed'),
+ TileVariable('wind_dir', 'wind_direction'),
+ ]
+ test_tile.standard_names = ['', '']
+ test_matchup_args['tile_service_factory'] = setup_mock_tile_service(test_tile)
+
+ with mock.patch(
+ 'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+ ) as mock_edge_endpoints:
+ # Test the satellite->insitu branch
+ # By mocking the getEndpointsByName function we are forcing
+ # Matchup to think this dummy matchup dataset is an insitu
+ # dataset
+ mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+ matchup.query_edge = lambda *args, **kwargs: json.load(
+ open(os.path.join(test_dir, 'edge_response.json')))
+
+ generator = matchup.match_satellite_to_insitu(**test_matchup_args)
+
+ insitu_matchup_result = list(generator)
+
+ tile_var_names = [var.variable_name for var in test_tile.variables]
+
+ # wind_speed is first, wind_dir is second
+ for data_dict in insitu_matchup_result[0][0].data:
+ assert data_dict.variable_name in tile_var_names
+ if data_dict.variable_name == 'wind_speed':
+ assert data_dict.variable_value == 2.10
+ elif data_dict.variable_name == 'wind_dir':
+ assert data_dict.variable_value == 21.0
+ for data_dict in insitu_matchup_result[1][0].data:
+ assert data_dict.variable_name in tile_var_names
+ if data_dict.variable_name == 'wind_speed':
+ assert data_dict.variable_value == 3.10
+ elif data_dict.variable_name == 'wind_dir':
+ assert data_dict.variable_value == 31.0
+
+
+def test_multi_variable_satellite_to_satellite_matchup(test_dir, test_tile, test_matchup_args):
+ """
+ Test multi-variable satellite to satellite matchup functionality.
+ """
+ test_tile.latitudes = np.array([0, 20], dtype=np.float32)
+ test_tile.longitudes = np.array([0, 20], dtype=np.float32)
+ test_tile.times = [1627490285]
+ test_tile.data = np.array([
+ [[
+ [1.10, 2.10],
+ [3.10, 4.10]
+ ]],
+ [[
+ [11.0, 21.0],
+ [31.0, 41.0]
+ ]]
+ ])
+ test_tile.is_multi = True
+ test_tile.variables = [
+ TileVariable('wind_speed', 'wind_speed'),
+ TileVariable('wind_dir', 'wind_direction')
+ ]
+ test_matchup_args['tile_service_factory'] = setup_mock_tile_service(test_tile)
+
+ with mock.patch(
+ 'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+ ) as mock_edge_endpoints:
+ mock_edge_endpoints.return_value = None
+ # Open the edge response json. We want to convert these points
+ # to tile points so we can test sat to sat matchup
+ edge_json = json.load(open(os.path.join(test_dir, 'edge_response.json')))
+ points = [wkt.loads(result['point']) for result in edge_json['results']]
+
+ matchup_tile = Tile()
+ matchup_tile.variables = [
+ TileVariable('sst', 'sea_surface_temperature'),
+ TileVariable('wind_dir', 'wind_direction')
+ ]
+ matchup_tile.latitudes = np.array([point.y for point in points], dtype=np.float32)
+ matchup_tile.longitudes = np.array([point.x for point in points], dtype=np.float32)
+ matchup_tile.times = [edge_json['results'][0]['time']]
+ matchup_tile.data = np.array([
+ [[
+ [10.0, 0, 0],
+ [0, 20.0, 0],
+ [0, 0, 30.0]
+ ]],
+ [[
+ [100.0, 0, 0],
+ [0, 200.0, 0],
+ [0, 0, 300.0]
+ ]]
+ ])
+ # matchup_tile.get_indices = lambda: [[0, 0, 0], [0, 1, 1], [0, 2, 2]]
+ matchup_tile.is_multi = True
+
+ test_matchup_args['tile_service_factory']().find_tiles_in_polygon.return_value = [
+ matchup_tile
+ ]
+
+ generator = matchup.match_satellite_to_insitu(**test_matchup_args)
+ matchup_result = list(generator)
+ assert len(matchup_result) == 2
+ assert len(matchup_result[0]) == 2
+ assert len(matchup_result[1]) == 2
+ assert len(matchup_result[0][0].data) == 2
+ assert len(matchup_result[0][1].data) == 2
+ assert len(matchup_result[1][0].data) == 2
+ assert len(matchup_result[1][1].data) == 2
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index f6275f4..56eebab 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -22,6 +22,7 @@ from shapely.geometry import Polygon
from datetime import datetime
from itertools import chain
from math import cos, radians
+from dataclasses import dataclass
import numpy as np
import pyproj
@@ -229,7 +230,7 @@ class Matchup(NexusCalcSparkHandler):
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
- self.log.debug('Found %s tile_ids', len(tile_ids))
+ self.log.info('Found %s tile_ids', len(tile_ids))
# Call spark_matchup
self.log.debug("Calling Spark Driver")
try:
@@ -321,12 +322,29 @@ class Matchup(NexusCalcSparkHandler):
"fileurl": domspoint.file_url,
"id": domspoint.data_id,
"source": domspoint.source,
+ "data": [data_point.__dict__ for data_point in domspoint.data]
}
- if domspoint.satellite_var_name:
- doms_dict[domspoint.satellite_var_name] = domspoint.satellite_var_value
return doms_dict
+@dataclass
+class DataPoint:
+ """
+ Represents a single point of data. This is used to construct the
+ output of the matchup algorithm.
+
+ :attribute variable_name: The name of the NetCDF variable.
+ :attribute cf_variable_name: The CF standard_name of the
+ NetCDF variable. This will be None if the standard_name does not
+ exist in the source data file.
+ :attribute variable_value: value at some point for the given
+ variable.
+ """
+ variable_name: str = None
+ cf_variable_name: str = None
+ variable_value: float = None
+
+
class DomsPoint(object):
def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
@@ -336,8 +354,7 @@ class DomsPoint(object):
self.depth = depth
self.data_id = data_id
- self.satellite_var_name = None
- self.satellite_var_value = None
+ self.data = None
self.wind_u = None
self.wind_v = None
@@ -362,10 +379,20 @@ class DomsPoint(object):
point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
- # Get the name of the satellite variable from the source NetCDF
- satellite_var_name = tile.var_name
- point.satellite_var_name = satellite_var_name
- point.satellite_var_value = nexus_point.data_val.item()
+ if tile.is_multi:
+ data_vals = nexus_point.data_vals
+ else:
+ data_vals = [nexus_point.data_vals]
+
+ data = []
+ for data_val, variable in zip(data_vals, tile.variables):
+ if data_val:
+ data.append(DataPoint(
+ variable_name=variable.variable_name,
+ variable_value=data_val,
+ cf_variable_name=variable.standard_name
+ ))
+ point.data = data
try:
point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
@@ -430,15 +457,45 @@ class DomsPoint(object):
point.device = edge_point.get('device')
point.file_url = edge_point.get('fileurl')
+ data_fields = [
+ 'eastward_wind',
+ 'northward_wind',
+ 'wind_direction',
+ 'wind_speed',
+ 'sea_water_temperature',
+ 'sea_water_temperature_depth',
+ 'sea_water_salinity',
+ 'sea_water_salinity_depth',
+ ]
+ data = []
+ # This is for in-situ secondary points
+ for name in data_fields:
+ val = edge_point.get(name)
+ if val:
+ data.append(DataPoint(
+ variable_name=name,
+ variable_value=val
+ ))
+
+
+ # This is for satellite secondary points
+ if 'variables' in edge_point:
+
+ data.extend([DataPoint(
+ variable_name=variable.variable_name,
+ variable_value=var_value,
+ cf_variable_name=variable.standard_name
+ ) for var_value, variable in zip(
+ edge_point['var_values'],
+ edge_point['variables']
+ ) if var_value])
+ point.data = data
+
try:
point.data_id = str(edge_point['id'])
except KeyError:
point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude)
- if 'var_name' in edge_point and 'var_value' in edge_point:
- point.satellite_var_name = edge_point['var_name']
- point.satellite_var_value = edge_point['var_value']
-
return point
@@ -547,6 +604,11 @@ def tile_to_edge_points(tile):
edge_points = []
for idx in indices:
+ if tile.is_multi:
+ data = [var_data[tuple(idx)] for var_data in tile.data]
+ else:
+ data = [tile.data[tuple(idx)]]
+
edge_point = {
'point': f'Point({tile.longitudes[idx[2]]} {tile.latitudes[idx[1]]})',
'time': datetime.utcfromtimestamp(tile.times[idx[0]]).strftime('%Y-%m-%dT%H:%M:%SZ'),
@@ -554,8 +616,8 @@ def tile_to_edge_points(tile):
'platform': None,
'device': None,
'fileurl': tile.granule,
- 'var_name': tile.var_name,
- 'var_value': tile.data[tuple(idx)]
+ 'variables': tile.variables,
+ 'var_values': data
}
edge_points.append(edge_point)
return edge_points
@@ -720,13 +782,17 @@ def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, s
print("%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id))
for i, point_matches in enumerate(matched_indexes):
if len(point_matches) > 0:
+ if tile.is_multi:
+ data_vals = [tile_data[tuple(valid_indices[i])] for tile_data in tile.data]
+ else:
+ data_vals = tile.data[tuple(valid_indices[i])]
p_nexus_point = NexusPoint(
latitude=tile.latitudes[valid_indices[i][1]],
longitude=tile.longitudes[valid_indices[i][2]],
depth=None,
time=tile.times[valid_indices[i][0]],
index=valid_indices[i],
- data_val=tile.data[tuple(valid_indices[i])]
+ data_vals=data_vals
)
p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile)
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index 86a8469..e410b99 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -53,6 +53,31 @@ class NexusTileData(Model):
return from_shaped_array(the_tile_data.variable_data)
def get_lat_lon_time_data_meta(self):
+ """
+ Retrieve data from data store and metadata from metadata store
+ for this tile. For gridded tiles, the tile shape of the data
+ will match the input shape. For example, if the input was a
+ 30x30 tile, all variables will also be 30x30. However, if the
+ tile is a swath tile, the data will be transformed along the
+ diagonal of the data matrix. For example, a 30x30 tile would
+ become 900x900 where the 900 points are along the diagonal.
+
+ Multi-variable tile will also include an extra dimension in the
+ data array. For example, a 30 x 30 x 30 array would be
+ transformed to N x 30 x 30 x 30 where N is the number of
+ variables in this tile.
+
+ latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var
+
+ :return: latitude data
+ :return: longitude data
+ :return: time data
+ :return: data
+ :return: meta data dictionary
+ :return: boolean flag, True if this tile has more than one variable
+ """
+ is_multi_var = False
+
if self._get_nexus_tile().HasField('grid_tile'):
grid_tile = self._get_nexus_tile().grid_tile
@@ -72,7 +97,7 @@ class NexusTileData(Model):
meta_array = meta_array[np.newaxis, :]
meta_data[name] = meta_array
- return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data
+ return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var
elif self._get_nexus_tile().HasField('swath_tile'):
swath_tile = self._get_nexus_tile().swath_tile
@@ -97,7 +122,7 @@ class NexusTileData(Model):
reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
meta_data[name] = reshaped_meta_array
- return latitude_data, longitude_data, time_data, tile_data, meta_data
+ return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
elif self._get_nexus_tile().HasField('time_series_tile'):
time_series_tile = self._get_nexus_tile().time_series_tile
@@ -122,12 +147,83 @@ class NexusTileData(Model):
meta_data[name] = reshaped_meta_array
- return latitude_data, longitude_data, time_data, tile_data, meta_data
+ return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
+ elif self._get_nexus_tile().HasField('swath_multi_variable_tile'):
+ swath_tile = self._get_nexus_tile().swath_multi_variable_tile
+ is_multi_var = True
+
+ latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
+ longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
+ time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
+
+ # Simplify the tile if the time dimension is the same value repeated
+ if np.all(time_data == np.min(time_data)):
+ time_data = np.array([np.min(time_data)])
+
+ swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
+
+ desired_shape = (
+ len(time_data),
+ len(latitude_data),
+ len(longitude_data),
+ )
+ tile_data = self._to_standard_index(swath_tile_data, desired_shape, is_multi_var=True)
+
+ # Extract the meta data
+ meta_data = {}
+ for meta_data_obj in swath_tile.meta_data:
+ name = meta_data_obj.name
+ actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
+ reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
+ meta_data[name] = reshaped_meta_array
+
+ return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
+ elif self._get_nexus_tile().HasField('grid_multi_variable_tile'):
+ grid_multi_variable_tile = self._get_nexus_tile().grid_multi_variable_tile
+ is_multi_var = True
+
+ grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.variable_data))
+ latitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.latitude))
+ longitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.longitude))
+
+ # If there are 3 dimensions, that means the time dimension
+ # was squeezed. Add back in
+ if len(grid_tile_data.shape) == 3:
+ grid_tile_data = np.expand_dims(grid_tile_data, axis=1)
+ # If there are 4 dimensions, that means the time dimension
+ # is present. Move the multivar dimension.
+ if len(grid_tile_data.shape) == 4:
+ grid_tile_data = np.moveaxis(grid_tile_data, -1, 0)
+
+ # Extract the meta data
+ meta_data = {}
+ for meta_data_obj in grid_multi_variable_tile.meta_data:
+ name = meta_data_obj.name
+ meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
+ if len(meta_array.shape) == 2:
+ meta_array = meta_array[np.newaxis, :]
+ meta_data[name] = meta_array
+
+ return latitude_data, longitude_data, np.array([grid_multi_variable_tile.time]), grid_tile_data, meta_data, is_multi_var
else:
- raise NotImplementedError("Only supports grid_tile, swath_tile, and time_series_tile")
+ raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile")
@staticmethod
- def _to_standard_index(data_array, desired_shape):
+ def _to_standard_index(data_array, desired_shape, is_multi_var=False):
+ """
+ Transform swath data to a standard format where data runs along
+ diagonal of ND matrix and the non-diagonal data points are
+ masked
+
+ :param data_array: The data array to be transformed
+ :param desired_shape: The desired shape of the resulting array
+ :param is_multi_var: True if this is a multi-variable tile
+ :type data_array: np.array
+ :type desired_shape: tuple
+ :type is_multi_var: bool
+ :return: Reshaped array
+ :rtype: np.array
+ """
if desired_shape[0] == 1:
reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
@@ -138,6 +234,23 @@ class NexusTileData(Model):
reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
row.flat, col.flat]
reshaped_array = reshaped_array[np.newaxis, :]
+ elif is_multi_var == True:
+ # Break the array up by variable. Translate shape from
+ # len(times) x len(latitudes) x len(longitudes) x num_vars,
+ # to
+ # num_vars x len(times) x len(latitudes) x len(longitudes)
+ reshaped_data_array = np.moveaxis(data_array, -1, 0)
+ reshaped_array = []
+
+ for variable_data_array in reshaped_data_array:
+ variable_reshaped_array = np.ma.masked_all(desired_shape)
+ row, col = np.indices(variable_data_array.shape)
+
+ variable_reshaped_array[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array[
+ row.flat, col.flat]
+ variable_reshaped_array.mask[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array.mask[
+ row.flat, col.flat]
+ reshaped_array.append(variable_reshaped_array)
else:
reshaped_array = np.ma.masked_all(desired_shape)
row, col = np.indices(data_array.shape)
diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py
index 01b4283..753d264 100644
--- a/data-access/nexustiles/model/nexusmodel.py
+++ b/data-access/nexustiles/model/nexusmodel.py
@@ -16,34 +16,81 @@
from collections import namedtuple
import numpy as np
+from dataclasses import dataclass
-NexusPoint = namedtuple('NexusPoint', 'latitude longitude depth time index data_val')
+
+NexusPoint = namedtuple('NexusPoint', 'latitude longitude depth time index data_vals')
BBox = namedtuple('BBox', 'min_lat max_lat min_lon max_lon')
TileStats = namedtuple('TileStats', 'min max mean count')
-class Tile(object):
- def __init__(self):
- self.tile_id = None
- self.dataset_id = None
- self.section_spec = None
- self.dataset = None
- self.granule = None
-
- self.bbox = None
-
- self.min_time = None
- self.max_time = None
+@dataclass
+class TileVariable:
+ """
+ TileVariable class representing a single variable. This contains
+ both the name of the variable and the CF standard name.
- self.tile_stats = None
- self.var_name = None
+ :attribute variable_name: Name of the variable in the tile. This is
+ the name from the satellite data file.
+ :attribute standard_name: CF Standard name of the variable in the
+ tile. This is the 'standard_name' attribute from the satellite data file. This might be null in the case where the source variable does not contain a standard_name attribute.
+ """
+ variable_name: str = None
+ standard_name: str = None
- self.latitudes = None # This should be a 1-d ndarray
- self.longitudes = None # This should be a 1-d ndarray
- self.times = None # This should be a 1-d ndarray
- self.data = None # This should be an ndarray with shape len(times) x len(latitudes) x len(longitudes)
- self.meta_data = None # This should be a dict of the form { 'meta_data_name' : [[[ndarray]]] }. Each ndarray should be the same shape as data.
+@dataclass
+class Tile(object):
+ """
+ Tile class representing the contents of a tile. The tile contents
+ are populated using the metadata store and the data store.
+
+ :attribute tile_id: Unique UUID tile ID, also used in data store and
+ metadata store to distinguish this tile
+ :attribute dataset_id: Unique dataset ID this tile belongs to
+ :attribute section_spec: A summary of the indices used from the source
+ granule to create this tile. Format is
+ dimension:min_index:max_index,dimension:min_index:max_index,...
+ :attribute dataset: The name of the dataset this tile belongs to
+ :attribute granule: The name of the granule this tile is sourced from
+ :attribute bbox: Comma-separated string representing the spatial bounds
+ of this tile. The format is min_lon, min_lat, max_lon, max_lat
+ :attribute min_time: ISO 8601 formatted timestamp representing the
+ temporal minimum of this tile
+ :attribute max_time: ISO 8601 formatted timestamp representing the
+ temporal minimum of this tile
+ :attribute tile_stats: Dictionary representing the min, max, mean, and
+ count of this tile
+ :attribute variables: A list of size N where N == the number of vars
+ this tile represents. The list type is TileVariable.
+ :attribute latitudes: 1-d ndarray representing the latitude values of
+ this tile
+ :attribute longitudes: 1-d ndarray representing the longitude values of
+ this tile
+ :attribute times: 1-d ndarray representing the longitude values of
+ this tile
+ :attribute data: This should be an ndarray with shape len(times) x
+ len(latitudes) x len(longitudes) x num_vars
+ :attribute is_multi: 'True' if this is a multi-var tile
+ :attribute meta_data: dict of the form {'meta_data_name':
+ [[[ndarray]]]}. Each ndarray should be the same shape as data.
+ """
+ tile_id: str = None
+ dataset_id: str = None
+ section_spec: str = None
+ dataset: str = None
+ granule: str = None
+ bbox: str = None
+ min_time: str = None
+ max_time: str = None
+ tile_stats: dict = None
+ variables: list = None
+ latitudes: np.array = None
+ longitudes: np.array = None
+ times: np.array = None
+ data: np.array = None
+ is_multi: bool = None
+ meta_data: dict = None
def __str__(self):
return str(self.get_summary())
@@ -100,6 +147,13 @@ class Tile(object):
def get_indices(self, include_nan=False):
if include_nan:
return list(np.ndindex(self.data.shape))
+ if self.is_multi:
+ # For each variable, combine masks. This is a logical or
+ # operation, because we want to ensure we don't lose any
+ # data.
+ combined_data_mask = np.logical_or(*self.data)
+ # Return the indices where the data is valid
+ return np.argwhere(combined_data_mask)
else:
return np.transpose(np.where(np.ma.getmaskarray(self.data) == False)).tolist()
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index eb847d2..573216e 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -16,8 +16,9 @@
import configparser
import logging
import sys
+import json
from datetime import datetime
-from functools import wraps
+from functools import wraps, reduce
import numpy as np
import numpy.ma as ma
@@ -29,7 +30,7 @@ from .dao import CassandraProxy
from .dao import DynamoProxy
from .dao import S3Proxy
from .dao import SolrProxy
-from .model.nexusmodel import Tile, BBox, TileStats
+from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -356,7 +357,16 @@ class NexusTileService(object):
| ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \
| ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :]
- tile.data = ma.masked_where(data_mask, tile.data)
+ # If this is multi-var, need to mask each variable separately.
+ if tile.is_multi:
+ # Combine space/time mask with existing mask on data
+ data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask])
+
+ num_vars = len(tile.data)
+ multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0)
+ tile.data = ma.masked_where(multi_data_mask, tile.data)
+ else:
+ tile.data = ma.masked_where(data_mask, tile.data)
tiles[:] = [tile for tile in tiles if not tile.data.mask.all()]
@@ -437,13 +447,14 @@ class NexusTileService(object):
raise Exception("Missing data for tile_id(s) %s." % missing_data)
for a_tile in tiles:
- lats, lons, times, data, meta = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta()
+ lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta()
a_tile.latitudes = lats
a_tile.longitudes = lons
a_tile.times = times
a_tile.data = data
a_tile.meta_data = meta
+ a_tile.is_multi = is_multi_var
del (tile_data_by_id[a_tile.tile_id])
@@ -519,10 +530,45 @@ class NexusTileService(object):
pass
try:
- tile.var_name = solr_doc['tile_var_name_s']
+ # Ensure backwards compatibility by working with old
+ # tile_var_name_s and tile_standard_name_s fields to
+
+ # will be overwritten if tile_var_name_ss is present
+ # as well.
+ if '[' in solr_doc['tile_var_name_s']:
+ var_names = json.loads(solr_doc['tile_var_name_s'])
+ else:
+ var_names = [solr_doc['tile_var_name_s']]
+
+ standard_name = solr_doc.get(
+ 'tile_standard_name_s',
+ json.dumps([None] * len(var_names))
+ )
+ if '[' in standard_name:
+ standard_names = json.loads(standard_name)
+ else:
+ standard_names = [standard_name]
+
+ tile.variables = []
+ for var_name, standard_name in zip(var_names, standard_names):
+ tile.variables.append(TileVariable(
+ variable_name=var_name,
+ standard_name=standard_name
+ ))
except KeyError:
pass
+
+ if 'tile_var_name_ss' in solr_doc:
+ tile.variables = []
+ for var_name in solr_doc['tile_var_name_ss']:
+ standard_name_key = f'{var_name}.tile_standard_name_s'
+ standard_name = solr_doc.get(standard_name_key)
+ tile.variables.append(TileVariable(
+ variable_name=var_name,
+ standard_name=standard_name
+ ))
+
tiles.append(tile)
return tiles