You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2022/04/12 18:00:33 UTC

[incubator-sdap-nexus] branch s3Tiles created (now 5675a39)

This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a change to branch s3Tiles
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


      at 5675a39  wip: have s3 datastore option available from the webapp call

This branch includes the following new commits:

     new 5675a39  wip: have s3 datastore option available from the webapp call

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sdap-nexus] 01/01: wip: have s3 datastore option available from the webapp call

Posted by tl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch s3Tiles
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 5675a39fdde088cbf4b4bafd3adc8d03c040f3a0
Author: Thomas Loubrieu <lo...@jpl.nasa.gov>
AuthorDate: Tue Apr 12 11:00:18 2022 -0700

    wip: have s3 datastore option available from the webapp call
---
 analysis/webservice/webapp.py                      |   3 +
 data-access/nexustiles/dao/CassandraProxy.py       | 223 +--------------------
 .../{CassandraProxy.py => NexusTileDataBase.py}    |  96 +--------
 data-access/nexustiles/dao/S3Proxy.py              |  82 +-------
 4 files changed, 8 insertions(+), 396 deletions(-)

diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index abb09b8..9c4410e 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -73,9 +73,12 @@ if __name__ == "__main__":
            help='time out for solr requests in seconds, default (60) is ok for most deployments'
                 ' when solr performances are not good this might need to be increased')
     define('solr_host', help='solr host and port')
+    define('datastore_store', help='datastore (cassandra or s3)')
     define('cassandra_host', help='cassandra host')
     define('cassandra_username', help='cassandra username')
     define('cassandra_password', help='cassandra password')
+    define('s3_bucket', help='s3 bucket')
+    define('s3_region', help='s3 region')
 
     parse_command_line()
     algorithm_config = inject_args_in_config(options, algorithm_config)
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py
index e410b99..3f81483 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/CassandraProxy.py
@@ -25,12 +25,14 @@ from cassandra.cqlengine.models import Model
 from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
 from multiprocessing.synchronize import Lock
 from nexusproto.serialization import from_shaped_array
+from .NexusTileDataBase import NexusTileDataBase
 
 INIT_LOCK = Lock(ctx=None)
 
 logger = logging.getLogger(__name__)
 
-class NexusTileData(Model):
+
+class NexusTileData(Model, NexusTileDataBase):
     __table_name__ = 'sea_surface_temp'
     tile_id = columns.UUID(primary_key=True)
     tile_blob = columns.Blob()
@@ -43,225 +45,6 @@ class NexusTileData(Model):
 
         return self.__nexus_tile
 
-    def get_raw_data_array(self):
-
-        nexus_tile = self._get_nexus_tile()
-        the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
-
-        the_tile_data = getattr(nexus_tile.tile, the_tile_type)
-
-        return from_shaped_array(the_tile_data.variable_data)
-
-    def get_lat_lon_time_data_meta(self):
-        """
-        Retrieve data from data store and metadata from metadata store
-        for this tile. For gridded tiles, the tile shape of the data
-        will match the input shape. For example, if the input was a
-        30x30 tile, all variables will also be 30x30. However, if the
-        tile is a swath tile, the data will be transformed along the
-        diagonal of the data matrix. For example, a 30x30 tile would
-        become 900x900 where the 900 points are along the diagonal.
-
-        Multi-variable tile will also include an extra dimension in the
-        data array. For example, a 30 x 30 x 30 array would be
-        transformed to N x 30 x 30 x 30 where N is the number of
-         variables in this tile.
-
-        latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var
-
-        :return: latitude data
-        :return: longitude data
-        :return: time data
-        :return: data
-        :return: meta data dictionary
-        :return: boolean flag, True if this tile has more than one variable
-        """
-        is_multi_var = False
-
-        if self._get_nexus_tile().HasField('grid_tile'):
-            grid_tile = self._get_nexus_tile().grid_tile
-
-            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
-            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
-
-            if len(grid_tile_data.shape) == 2:
-                grid_tile_data = grid_tile_data[np.newaxis, :]
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in grid_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                if len(meta_array.shape) == 2:
-                    meta_array = meta_array[np.newaxis, :]
-                meta_data[name] = meta_array
-
-            return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('swath_tile'):
-            swath_tile = self._get_nexus_tile().swath_tile
-
-            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
-            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
-            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
-
-            # Simplify the tile if the time dimension is the same value repeated
-            if np.all(time_data == np.min(time_data)):
-                time_data = np.array([np.min(time_data)])
-
-            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
-
-            tile_data = self._to_standard_index(swath_tile_data,
-                                                (len(time_data), len(latitude_data), len(longitude_data)))
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in swath_tile.meta_data:
-                name = meta_data_obj.name
-                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('time_series_tile'):
-            time_series_tile = self._get_nexus_tile().time_series_tile
-
-            time_series_tile_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.variable_data))
-            time_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.time)).reshape(-1)
-            latitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.longitude))
-
-            reshaped_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
-            idx = np.arange(len(latitude_data))
-            reshaped_array[:, idx, idx] = time_series_tile_data
-            tile_data = reshaped_array
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in time_series_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-
-                reshaped_meta_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
-                idx = np.arange(len(latitude_data))
-                reshaped_meta_array[:, idx, idx] = meta_array
-
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('swath_multi_variable_tile'):
-            swath_tile = self._get_nexus_tile().swath_multi_variable_tile
-            is_multi_var = True
-
-            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
-            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
-            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
-
-            # Simplify the tile if the time dimension is the same value repeated
-            if np.all(time_data == np.min(time_data)):
-                time_data = np.array([np.min(time_data)])
-
-            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
-
-            desired_shape = (
-                len(time_data),
-                len(latitude_data),
-                len(longitude_data),
-            )
-            tile_data = self._to_standard_index(swath_tile_data, desired_shape, is_multi_var=True)
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in swath_tile.meta_data:
-                name = meta_data_obj.name
-                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var
-        elif self._get_nexus_tile().HasField('grid_multi_variable_tile'):
-            grid_multi_variable_tile = self._get_nexus_tile().grid_multi_variable_tile
-            is_multi_var = True
-
-            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.variable_data))
-            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.longitude))
-
-            # If there are 3 dimensions, that means the time dimension
-            # was squeezed. Add back in
-            if len(grid_tile_data.shape) == 3:
-                grid_tile_data = np.expand_dims(grid_tile_data, axis=1)
-            # If there are 4 dimensions, that means the time dimension
-            # is present. Move the multivar dimension.
-            if len(grid_tile_data.shape) == 4:
-                grid_tile_data = np.moveaxis(grid_tile_data, -1, 0)
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in grid_multi_variable_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                if len(meta_array.shape) == 2:
-                    meta_array = meta_array[np.newaxis, :]
-                meta_data[name] = meta_array
-
-            return latitude_data, longitude_data, np.array([grid_multi_variable_tile.time]), grid_tile_data, meta_data, is_multi_var
-        else:
-            raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile")
-
-    @staticmethod
-    def _to_standard_index(data_array, desired_shape, is_multi_var=False):
-        """
-        Transform swath data to a standard format where data runs along
-        diagonal of ND matrix and the non-diagonal data points are
-        masked
-
-        :param data_array: The data array to be transformed
-        :param desired_shape: The desired shape of the resulting array
-        :param is_multi_var: True if this is a multi-variable tile
-        :type data_array: np.array
-        :type desired_shape: tuple
-        :type is_multi_var: bool
-        :return: Reshaped array
-        :rtype: np.array
-        """
-
-        if desired_shape[0] == 1:
-            reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-            reshaped_array = reshaped_array[np.newaxis, :]
-        elif is_multi_var == True:
-            # Break the array up by variable. Translate shape from
-            # len(times) x len(latitudes) x len(longitudes) x num_vars,
-            # to
-            # num_vars x len(times) x len(latitudes) x len(longitudes)
-            reshaped_data_array = np.moveaxis(data_array, -1, 0)
-            reshaped_array = []
-
-            for variable_data_array in reshaped_data_array:
-                variable_reshaped_array = np.ma.masked_all(desired_shape)
-                row, col = np.indices(variable_data_array.shape)
-
-                variable_reshaped_array[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array[
-                    row.flat, col.flat]
-                variable_reshaped_array.mask[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array.mask[
-                    row.flat, col.flat]
-                reshaped_array.append(variable_reshaped_array)
-        else:
-            reshaped_array = np.ma.masked_all(desired_shape)
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-
-        return reshaped_array
-
 
 class CassandraProxy(object):
     def __init__(self, config):
diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/NexusTileDataBase.py
similarity index 74%
copy from data-access/nexustiles/dao/CassandraProxy.py
copy to data-access/nexustiles/dao/NexusTileDataBase.py
index e410b99..a15e544 100644
--- a/data-access/nexustiles/dao/CassandraProxy.py
+++ b/data-access/nexustiles/dao/NexusTileDataBase.py
@@ -1,48 +1,6 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import uuid
-from configparser import NoOptionError
-
-import nexusproto.DataTile_pb2 as nexusproto
 import numpy as np
-from cassandra.auth import PlainTextAuthProvider
-from cassandra.cqlengine import columns, connection, CQLEngineException
-from cassandra.cqlengine.models import Model
-from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
-from multiprocessing.synchronize import Lock
-from nexusproto.serialization import from_shaped_array
-
-INIT_LOCK = Lock(ctx=None)
-
-logger = logging.getLogger(__name__)
-
-class NexusTileData(Model):
-    __table_name__ = 'sea_surface_temp'
-    tile_id = columns.UUID(primary_key=True)
-    tile_blob = columns.Blob()
-
-    __nexus_tile = None
-
-    def _get_nexus_tile(self):
-        if self.__nexus_tile is None:
-            self.__nexus_tile = nexusproto.TileData.FromString(self.tile_blob)
-
-        return self.__nexus_tile
 
+class NexusTileDataBase:
     def get_raw_data_array(self):
 
         nexus_tile = self._get_nexus_tile()
@@ -261,55 +219,3 @@ class NexusTileData(Model):
                 row.flat, col.flat]
 
         return reshaped_array
-
-
-class CassandraProxy(object):
-    def __init__(self, config):
-        self.config = config
-        self.__cass_url = config.get("cassandra", "host")
-        self.__cass_username = config.get("cassandra", "username")
-        self.__cass_password = config.get("cassandra", "password")
-        self.__cass_keyspace = config.get("cassandra", "keyspace")
-        self.__cass_local_DC = config.get("cassandra", "local_datacenter")
-        self.__cass_protocol_version = config.getint("cassandra", "protocol_version")
-        self.__cass_dc_policy = config.get("cassandra", "dc_policy")
-
-        try:
-            self.__cass_port = config.getint("cassandra", "port")
-        except NoOptionError:
-            self.__cass_port = 9042
-
-        with INIT_LOCK:
-            try:
-                connection.get_cluster()
-            except CQLEngineException:
-                self.__open()
-
-    def __open(self):
-        if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy':
-            dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC)
-            token_policy = TokenAwarePolicy(dc_policy)
-        elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy':
-            token_policy = WhiteListRoundRobinPolicy([self.__cass_url])
-
-        if self.__cass_username and self.__cass_password:
-            auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password)
-        else:
-            auth_provider = None
-
-        connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace,
-                         protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy,
-                         port=self.__cass_port,
-                         auth_provider=auth_provider)
-
-    def fetch_nexus_tiles(self, *tile_ids):
-        tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
-                    (isinstance(tile_id, str) or isinstance(tile_id, str))]
-
-        res = []
-        for tile_id in tile_ids:
-            filterResults = NexusTileData.objects.filter(tile_id=tile_id)
-            if len(filterResults) > 0:
-                res.append(filterResults[0])
-
-        return res
diff --git a/data-access/nexustiles/dao/S3Proxy.py b/data-access/nexustiles/dao/S3Proxy.py
index c8d3adf..b491a90 100644
--- a/data-access/nexustiles/dao/S3Proxy.py
+++ b/data-access/nexustiles/dao/S3Proxy.py
@@ -19,6 +19,7 @@ import boto3
 import nexusproto.DataTile_pb2 as nexusproto
 import numpy as np
 from nexusproto.serialization import from_shaped_array
+from .NexusTileDataBase import NexusTileDataBase
 
 
 class NexusTileData(object):
@@ -38,87 +39,6 @@ class NexusTileData(object):
 
         return self.__nexus_tile
 
-    def get_raw_data_array(self):
-
-        nexus_tile = self._get_nexus_tile()
-        the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
-
-        the_tile_data = getattr(nexus_tile.tile, the_tile_type)
-
-        return from_shaped_array(the_tile_data.variable_data)
-
-    def get_lat_lon_time_data_meta(self):
-        if self._get_nexus_tile().HasField('grid_tile'):
-            grid_tile = self._get_nexus_tile().grid_tile
-
-            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
-            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
-            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
-
-            if len(grid_tile_data.shape) == 2:
-                grid_tile_data = grid_tile_data[np.newaxis, :]
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in grid_tile.meta_data:
-                name = meta_data_obj.name
-                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                if len(meta_array.shape) == 2:
-                    meta_array = meta_array[np.newaxis, :]
-                meta_data[name] = meta_array
-
-            return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data
-        elif self._get_nexus_tile().HasField('swath_tile'):
-            swath_tile = self._get_nexus_tile().swath_tile
-
-            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
-            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
-            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
-
-            # Simplify the tile if the time dimension is the same value repeated
-            if np.all(time_data == np.min(time_data)):
-                time_data = np.array([np.min(time_data)])
-
-            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
-
-            tile_data = self._to_standard_index(swath_tile_data,
-                                                (len(time_data), len(latitude_data), len(longitude_data)))
-
-            # Extract the meta data
-            meta_data = {}
-            for meta_data_obj in swath_tile.meta_data:
-                name = meta_data_obj.name
-                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
-                meta_data[name] = reshaped_meta_array
-
-            return latitude_data, longitude_data, time_data, tile_data, meta_data
-        else:
-            raise NotImplementedError("Only supports grid_tile and swath_tile")
-
-    @staticmethod
-    def _to_standard_index(data_array, desired_shape):
-
-        if desired_shape[0] == 1:
-            reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-            reshaped_array = reshaped_array[np.newaxis, :]
-        else:
-            reshaped_array = np.ma.masked_all(desired_shape)
-            row, col = np.indices(data_array.shape)
-
-            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
-                row.flat, col.flat]
-            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
-                row.flat, col.flat]
-
-        return reshaped_array
-
 
 class S3Proxy(object):
     def __init__(self, config):