You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2022/04/12 18:00:34 UTC

[incubator-sdap-nexus] 01/01: wip: have s3 datastore option available from the webapp call

This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch s3Tiles
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 5675a39fdde088cbf4b4bafd3adc8d03c040f3a0
Author: Thomas Loubrieu <lo...@jpl.nasa.gov>
AuthorDate: Tue Apr 12 11:00:18 2022 -0700

    wip: have s3 datastore option available from the webapp call
---
 analysis/webservice/webapp.py                      |   3 +
 data-access/nexustiles/dao/CassandraProxy.py       | 223 +--------------------
 .../{CassandraProxy.py => NexusTileDataBase.py}    |  96 +--------
 data-access/nexustiles/dao/S3Proxy.py              |  82 +-------
 4 files changed, 8 insertions(+), 396 deletions(-)

diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index abb09b8..9c4410e 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -73,9 +73,12 @@ if __name__ == "__main__":
            help='time out for solr requests in seconds, default (60) is ok for most deployments'
                 ' when solr performances are not good this might need to be increased')
     define('solr_host', help='solr host and port')
+    define('datastore_store', help='datastore (cassandra or s3)')
     define('cassandra_host', help='cassandra host')
     define('cassandra_username', help='cassandra username')
     define('cassandra_password', help='cassandra password')
+    define('s3_bucket', help='s3 bucket')
+    define('s3_region', help='s3 region')
 
     parse_command_line()
     algorithm_config = inject_args_in_config(options, algorithm_config)
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index e410b99..3f81483 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -25,12 +25,14 @@ from cassandra.cqlengine.models import Model
 from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
 from multiprocessing.synchronize import Lock
 from nexusproto.serialization import from_shaped_array
+from .NexusTileDataBase import NexusTileDataBase
 
 INIT_LOCK = Lock(ctx=None)
 
 logger = logging.getLogger(__name__)
 
-class NexusTileData(Model):
+
+class NexusTileData(Model, NexusTileDataBase):
     __table_name__ = 'sea_surface_temp'
     tile_id = columns.UUID(primary_key=True)
     tile_blob = columns.Blob()
@@ -43,225 +45,6 @@ class NexusTileData(Model):
 
         return self.__nexus_tile
 
-    def get_raw_data_array(self):
-
-        nexus_tile = self._get_nexus_tile()
-        the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
-
-        the_tile_data = getattr(nexus_tile.tile, the_tile_type)
-
-        return from_shaped_array(the_tile_data.variable_data)
-
-    def get_lat_lon_time_data_meta(self):
-        """
-        Retrieve data from data store and metadata from metadata store
-        for this tile. For gridded tiles, the tile shape of the data
-        will match the input shape. For example, if the input was a
-        30x30 tile, all variables will also be 30x30. However, if the
-        tile is a swath tile, the data will be transformed along the
-        diagonal of the data matrix. For example, a 30x30 tile would
-        become 900x900 where the 900 points are along the diagonal.
-
-        Multi-variable tile will also include an extra dimension in the
-        data array. For example, a 30 x 30 x 30 array would be
-        transformed to N x 30 x 30 x 30 where N is the number of
-         variables in this tile.
-
-        latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var
-
-        :return: latitude data
-        :return: longitude data
-        :return: time data
-        :return: data
-        :return: meta data dictionary
-        :return: boolean flag, True if this tile has more than one variable
-        """
-        is_multi_var = False
-
-        if self._get_nexus_tile().HasField('grid_tile'):
-            grid_tile = self._get_nexus_tile().grid_tile
-
-            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
-            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
-
-            if len(grid_tile_data.shape) == 2:
-                grid_tile_data = grid_tile_data[np.newaxis, :]
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in grid_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                if len(meta_array.shape) == 2:
-                    meta_array = meta_array[np.newaxis, :]
-                meta_data[name] = meta_array
-
-            return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('swath_tile'):
-            swath_tile = self._get_nexus_tile().swath_tile
-
-            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
-            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
-            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
-
-            # Simplify the tile if the time dimension is the same value repeated
-            if np.all(time_data == np.min(time_data)):
-                time_data = np.array([np.min(time_data)])
-
-            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
-
-            tile_data = self._to_standard_index(swath_tile_data,
-                                                (len(time_data), len(latitude_data), len(longitude_data)))
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in swath_tile.meta_data:
-                name = meta_data_obj.name
-                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('time_series_tile'):
-            time_series_tile = self._get_nexus_tile().time_series_tile
-
-            time_series_tile_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.variable_data))
-            time_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.time)).reshape(-1)
-            latitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.longitude))
-
-            reshaped_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
-            idx = np.arange(len(latitude_data))
-            reshaped_array[:, idx, idx] = time_series_tile_data
-            tile_data = reshaped_array
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in time_series_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-
-                reshaped_meta_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
-                idx = np.arange(len(latitude_data))
-                reshaped_meta_array[:, idx, idx] = meta_array
-
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('swath_multi_variable_tile'):
-            swath_tile = self._get_nexus_tile().swath_multi_variable_tile
-            is_multi_var = True
-
-            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
-            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
-            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
-
-            # Simplify the tile if the time dimension is the same value repeated
-            if np.all(time_data == np.min(time_data)):
-                time_data = np.array([np.min(time_data)])
-
-            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
-
-            desired_shape = (
-                len(time_data),
-                len(latitude_data),
-                len(longitude_data),
-            )
-            tile_data = self._to_standard_index(swath_tile_data, desired_shape, is_multi_var=True)
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in swath_tile.meta_data:
-                name = meta_data_obj.name
-                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('grid_multi_variable_tile'):
-            grid_multi_variable_tile = self._get_nexus_tile().grid_multi_variable_tile
-            is_multi_var = True
-
-            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.variable_data))
-            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.longitude))
-
-            # If there are 3 dimensions, that means the time dimension
-            # was squeezed. Add back in
-            if len(grid_tile_data.shape) == 3:
-                grid_tile_data = np.expand_dims(grid_tile_data, axis=1)
-            # If there are 4 dimensions, that means the time dimension
-            # is present. Move the multivar dimension.
-            if len(grid_tile_data.shape) == 4:
-                grid_tile_data = np.moveaxis(grid_tile_data, -1, 0)
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in grid_multi_variable_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                if len(meta_array.shape) == 2:
-                    meta_array = meta_array[np.newaxis, :]
-                meta_data[name] = meta_array
-
-            return latitude_data, longitude_data, np.array([grid_multi_variable_tile.time]), grid_tile_data, meta_data, is_multi_var
-        else:
-            raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile")
-
-    @staticmethod
-    def _to_standard_index(data_array, desired_shape, is_multi_var=False):
-        """
-        Transform swath data to a standard format where data runs along
-        diagonal of ND matrix and the non-diagonal data points are
-        masked
-
-        :param data_array: The data array to be transformed
-        :param desired_shape: The desired shape of the resulting array
-        :param is_multi_var: True if this is a multi-variable tile
-        :type data_array: np.array
-        :type desired_shape: tuple
-        :type is_multi_var: bool
-        :return: Reshaped array
-        :rtype: np.array
-        """
-
-        if desired_shape[0] == 1:
-            reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-            reshaped_array = reshaped_array[np.newaxis, :]
-        elif is_multi_var == True:
-            # Break the array up by variable. Translate shape from
-            # len(times) x len(latitudes) x len(longitudes) x num_vars,
-            # to
-            # num_vars x len(times) x len(latitudes) x len(longitudes)
-            reshaped_data_array = np.moveaxis(data_array, -1, 0)
-            reshaped_array = []
-
-            for variable_data_array in reshaped_data_array:
-                variable_reshaped_array = np.ma.masked_all(desired_shape)
-                row, col = np.indices(variable_data_array.shape)
-
-                variable_reshaped_array[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array[
-                    row.flat, col.flat]
-                variable_reshaped_array.mask[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array.mask[
-                    row.flat, col.flat]
-                reshaped_array.append(variable_reshaped_array)
-        else:
-            reshaped_array = np.ma.masked_all(desired_shape)
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-
-        return reshaped_array
-
 
 class CassandraProxy(object):
     def __init__(self, config):
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/NexusTileDataBase.py
similarity index 74%
copy from data-access/nexustiles/dao/CassandraProxy.py
copy to data-access/nexustiles/dao/NexusTileDataBase.py
index e410b99..a15e544 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/NexusTileDataBase.py
@@ -1,48 +1,6 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import uuid
-from configparser import NoOptionError
-
-import nexusproto.DataTile_pb2 as nexusproto
 import numpy as np
-from cassandra.auth import PlainTextAuthProvider
-from cassandra.cqlengine import columns, connection, CQLEngineException
-from cassandra.cqlengine.models import Model
-from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
-from multiprocessing.synchronize import Lock
-from nexusproto.serialization import from_shaped_array
-
-INIT_LOCK = Lock(ctx=None)
-
-logger = logging.getLogger(__name__)
-
-class NexusTileData(Model):
-    __table_name__ = 'sea_surface_temp'
-    tile_id = columns.UUID(primary_key=True)
-    tile_blob = columns.Blob()
-
-    __nexus_tile = None
-
-    def _get_nexus_tile(self):
-        if self.__nexus_tile is None:
-            self.__nexus_tile = nexusproto.TileData.FromString(self.tile_blob)
-
-        return self.__nexus_tile
 
+class NexusTileDataBase:
     def get_raw_data_array(self):
 
         nexus_tile = self._get_nexus_tile()
@@ -261,55 +219,3 @@ class NexusTileData(Model):
                 row.flat, col.flat]
 
         return reshaped_array
-
-
-class CassandraProxy(object):
-    def __init__(self, config):
-        self.config = config
-        self.__cass_url = config.get("cassandra", "host")
-        self.__cass_username = config.get("cassandra", "username")
-        self.__cass_password = config.get("cassandra", "password")
-        self.__cass_keyspace = config.get("cassandra", "keyspace")
-        self.__cass_local_DC = config.get("cassandra", "local_datacenter")
-        self.__cass_protocol_version = config.getint("cassandra", "protocol_version")
-        self.__cass_dc_policy = config.get("cassandra", "dc_policy")
-
-        try:
-            self.__cass_port = config.getint("cassandra", "port")
-        except NoOptionError:
-            self.__cass_port = 9042
-
-        with INIT_LOCK:
-            try:
-                connection.get_cluster()
-            except CQLEngineException:
-                self.__open()
-
-    def __open(self):
-        if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
-            dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
-            token_policy = TokenAwarePolicy(dc_policy)
-        elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
-            token_policy = WhiteListRoundRobinPolicy([self.__cass_url])
-
-        if self.__cass_username and self.__cass_password:
-            auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
-        else:
-            auth_provider = None
-
-        connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
-                         protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
-                         port=self.__cass_port,
-                         auth_provider=auth_provider)
-
-    def fetch_nexus_tiles(self, *tile_ids):
-        tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
-                    (isinstance(tile_id, str) or isinstance(tile_id, str))]
-
-        res = []
-        for tile_id in tile_ids:
-            filterResults = NexusTileData.objects.filter(tile_id=tile_id)
-            if len(filterResults) > 0:
-                res.append(filterResults[0])
-
-        return res
diff --git a/data-access/nexustiles/dao/S3Proxy.py b/data-access/nexustiles/dao/S3Proxy.py
index c8d3adf..b491a90 100644
--- a/data-access/nexustiles/dao/S3Proxy.py
+++ b/data-access/nexustiles/dao/S3Proxy.py
@@ -19,6 +19,7 @@ import boto3
 import nexusproto.DataTile_pb2 as nexusproto
 import numpy as np
 from nexusproto.serialization import from_shaped_array
+from .NexusTileDataBase import NexusTileDataBase
 
 
 class NexusTileData(object):
@@ -38,87 +39,6 @@ class NexusTileData(object):
 
         return self.__nexus_tile
 
-    def get_raw_data_array(self):
-
-        nexus_tile = self._get_nexus_tile()
-        the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
-
-        the_tile_data = getattr(nexus_tile.tile, the_tile_type)
-
-        return from_shaped_array(the_tile_data.variable_data)
-
-    def get_lat_lon_time_data_meta(self):
-        if self._get_nexus_tile().HasField('grid_tile'):
-            grid_tile = self._get_nexus_tile().grid_tile
-
-            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
-            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
-
-            if len(grid_tile_data.shape) == 2:
-                grid_tile_data = grid_tile_data[np.newaxis, :]
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in grid_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                if len(meta_array.shape) == 2:
-                    meta_array = meta_array[np.newaxis, :]
-                meta_data[name] = meta_array
-
-            return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data
-        elif self._get_nexus_tile().HasField('swath_tile'):
-            swath_tile = self._get_nexus_tile().swath_tile
-
-            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
-            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
-            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
-
-            # Simplify the tile if the time dimension is the same value repeated
-            if np.all(time_data == np.min(time_data)):
-                time_data = np.array([np.min(time_data)])
-
-            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
-
-            tile_data = self._to_standard_index(swath_tile_data,
-                                                (len(time_data), len(latitude_data), len(longitude_data)))
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in swath_tile.meta_data:
-                name = meta_data_obj.name
-                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data
-        else:
-            raise NotImplementedError("Only supports grid_tile and swath_tile")
-
-    @staticmethod
-    def _to_standard_index(data_array, desired_shape):
-
-        if desired_shape[0] == 1:
-            reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-            reshaped_array = reshaped_array[np.newaxis, :]
-        else:
-            reshaped_array = np.ma.masked_all(desired_shape)
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-
-        return reshaped_array
-
 
 class S3Proxy(object):
     def __init__(self, config):