You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/12/06 16:23:36 UTC

[incubator-sdap-nexus] 01/01: add mutiband search algorithm

This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch multiband
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 5eb8a05306bc97305ffd12f97cdfe14d481afe99
Author: Thomas Loubrieu <lo...@jpl.nasa.gov>
AuthorDate: Mon Dec 6 11:23:21 2021 -0500

    add mutiband search algorithm
---
 analysis/README.md                                 |   3 +-
 .../webservice/algorithms/DataInBoundsSearchMB.py  | 280 +++++++++++++++++++++
 analysis/webservice/algorithms/__init__.py         |   1 +
 data-access/nexustiles/dao/SolrProxy.py            |   3 +-
 data-access/nexustiles/model/nexusmodel.py         |  97 ++++++-
 data-access/nexustiles/nexustiles.py               |   7 +-
 helm/templates/granule-ingester.yml                |   2 +-
 helm/templates/solr-create-collection.yml          |   6 +-
 8 files changed, 377 insertions(+), 22 deletions(-)

diff --git a/analysis/README.md b/analysis/README.md
index a55841b..5b057b1 100644
--- a/analysis/README.md
+++ b/analysis/README.md
@@ -10,7 +10,7 @@ Python module that exposes NEXUS analytical capabilities via a HTTP webservice.
 1. Setup a separate conda env or activate an existing one
 
     ````
-    conda create --name nexus-analysis python=2.7.17
+    conda create --name nexus-analysis python=3.7
     conda activate nexus-analysis
     ````
 
@@ -21,6 +21,7 @@ Python module that exposes NEXUS analytical capabilities via a HTTP webservice.
     conda install pyspark
     conda install -c conda-forge --file conda-requirements.txt
     #conda install numpy matplotlib mpld3 scipy netCDF4 basemap gdal pyproj=1.9.5.1 libnetcdf=4.3.3.1
+    pip install -e '.[dev]'
     ````
 
 3. Update the configuration for solr and cassandra
diff --git a/analysis/webservice/algorithms/DataInBoundsSearchMB.py b/analysis/webservice/algorithms/DataInBoundsSearchMB.py
new file mode 100644
index 0000000..401a478
--- /dev/null
+++ b/analysis/webservice/algorithms/DataInBoundsSearchMB.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+from datetime import datetime
+import math
+import numpy as np
+from pytz import timezone
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger(__name__)
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class DataInBoundsSearchCalcMBHandlerImpl(NexusCalcHandler):
+    name = "Data In-Bounds Search"
+    path = "/datainboundsmb"
+    description = "Fetches point values for a given dataset and geographical area"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "string",
+            "description": "The Dataset shortname to use in calculation. Required"
+        },
+        "parameter": {
+            "name": "Parameter",
+            "type": "string",
+            "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required if 'metadataFilter' not provided"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "metadataFilter": {
+            "name": "Metadata Filter",
+            "type": "string",
+            "description": "Filter in format key:value. Required if 'b' not provided"
+        }
+    }
+    singleton = True
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+
+        try:
+            ds = request.get_dataset()[0]
+        except:
+            raise NexusProcessingException(reason="'ds' argument is required", code=400)
+
+        parameter_s = request.get_argument('parameter', None)
+        if parameter_s not in ['sst', 'sss', 'wind', None]:
+            raise NexusProcessingException(
+                reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+            start_time = int((start_time - EPOCH).total_seconds())
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+            end_time = int((end_time - EPOCH).total_seconds())
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        bounding_polygon = metadata_filter = None
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            metadata_filter = request.get_metadata_filter()
+            if 0 == len(metadata_filter):
+                raise NexusProcessingException(
+                    reason="'b' or 'metadataFilter' argument is required. 'b' must be comma-delimited float formatted "
+                           "as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, "
+                           "Maximum (Northern) Latitude. 'metadataFilter' must be in the form key:value",
+                    code=400)
+
+        return ds, parameter_s, start_time, end_time, bounding_polygon, metadata_filter
+
+    @staticmethod
+    def accept_point(nexus_point):
+        return isinstance(nexus_point.latitude, np.float64) \
+            and isinstance(nexus_point.longitude, np.float64) \
+            and isinstance(nexus_point.time, np.int64)
+
+    def calc(self, computeOptions, **args):
+        ds, parameter, start_time, end_time, bounding_polygon, metadata_filter = self.parse_arguments(computeOptions)
+
+        includemeta = computeOptions.get_include_meta()
+
+        min_lat = max_lat = min_lon = max_lon = None
+        if bounding_polygon:
+            min_lat = bounding_polygon.bounds[1]
+            max_lat = bounding_polygon.bounds[3]
+            min_lon = bounding_polygon.bounds[0]
+            max_lon = bounding_polygon.bounds[2]
+
+            tiles = self._get_tile_service().get_tiles_bounded_by_box(min_lat, max_lat, min_lon, max_lon, ds, start_time,
+                                                                end_time)
+        else:
+            tiles = self._get_tile_service().get_tiles_by_metadata(metadata_filter, ds, start_time, end_time)
+
+        data = []
+        """
+        array_x = np.array([-2, -1, 0, 1, 2, 3])
+        multiply = np.broadcast_to(array_x[:, np.newaxis, np.newaxis], (array_x.size, 30, 30))
+        """
+        for tile in tiles:
+            for nexus_point in tile.nexus_point_generator_multi_band(include_nan=False):
+                if DataInBoundsSearchCalcMBHandlerImpl.accept_point(nexus_point):
+                    point = dict()
+                    point['id'] = tile.tile_id
+
+                    if parameter == 'sst':
+                        point['sst'] = nexus_point.data_val
+                    elif parameter == 'sss':
+                        point['sss'] = nexus_point.data_val
+                    elif parameter == 'wind':
+                        point['wind_u'] = nexus_point.data_val
+                        try:
+                            point['wind_v'] = tile.meta_data['wind_v'][tuple(nexus_point.index)]
+                        except (KeyError, IndexError):
+                            pass
+                        try:
+                            point['wind_direction'] = tile.meta_data['wind_dir'][tuple(nexus_point.index)]
+                        except (KeyError, IndexError):
+                            pass
+                        try:
+                            point['wind_speed'] = tile.meta_data['wind_speed'][tuple(nexus_point.index)]
+                        except (KeyError, IndexError):
+                            pass
+                    else:
+                        point['variable'] = nexus_point.data_val
+                        # TODO hardcoding EVI calculation for demo
+                        point['evi'] = DataInBoundsSearchCalcMBHandlerImpl.evi_on_time_series(point['variable'].data)
+                        data.append({
+                            'latitude': nexus_point.latitude,
+                            'longitude': nexus_point.longitude,
+                            'time': nexus_point.time,
+                            'data': [
+                                point['evi']
+                            ]
+                        })
+
+        if includemeta and len(tiles) > 0:
+            meta = [tile.get_summary() for tile in tiles]
+        else:
+            meta = None
+
+        result = DataInBoundsResult(
+            results=data,
+            stats={},
+            meta=meta)
+
+        result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+
+        return result
+
+    @staticmethod
+    def evi_on_time_series(band_val_tim_series):
+        return [DataInBoundsSearchCalcMBHandlerImpl.calculate_evi(v) for v in band_val_tim_series]
+
+    @staticmethod
+    def calculate_evi(band_vals):
+        """
+        Hardcoded method for HLS data.
+        Assuming incoming point has 6 data points.
+        Assuming they are in this order:
+        band 2; blue
+        band 3; green
+        band 4; red
+        band 5; nir
+        band 6; swirOne
+        band 7; swirTwo
+        calculating this formula:   if (whatIndex == 'evi2') {index <- 2.5*(nir - red) / (nir + 2.4*red + 1)}
+        """
+        x_weights = [0, 0, -2.5, 2.5, 0, 0]
+        y_weights = [0, 0, 2.4, 1, 0, 0]
+        x_constant = 0
+        y_constant = 1
+        if len(band_vals) != len(x_weights):
+            logger.warning(f'nexus_point array size is different from x_weights. not calculating')
+            return None
+        x = sum(band_vals * x_weights) + x_constant
+        y = sum(band_vals * y_weights) + y_constant
+        if math.isnan(x) or math.isnan(y):
+            logger.error(f'x or y is resulted in NaN. not calculating. {x} / {y}')
+            return None
+        if y == 0:
+            logger.warning(f'y is None after multiplying. not calculating')
+            return None
+        return x / y
+
+
+class DataInBoundsResult(NexusResults):
+    def toCSV(self):
+        rows = []
+
+        headers = [
+            "id",
+            "lon",
+            "lat",
+            "time"
+        ]
+
+        for i, result in enumerate(self.results()):
+            cols = []
+
+            cols.append(str(result['data'][0]['id']))
+            cols.append(str(result['longitude']))
+            cols.append(str(result['latitude']))
+            cols.append(datetime.utcfromtimestamp(result["time"]).strftime('%Y-%m-%dT%H:%M:%SZ'))
+            if 'sst' in result['data'][0]:
+                cols.append(str(result['data'][0]['sst']))
+                if i == 0:
+                    headers.append("sea_water_temperature")
+            elif 'sss' in result['data'][0]:
+                cols.append(str(result['data'][0]['sss']))
+                if i == 0:
+                    headers.append("sea_water_salinity")
+            elif 'wind_u' in result['data'][0]:
+                cols.append(str(result['data'][0]['wind_u']))
+                cols.append(str(result['data'][0]['wind_v']))
+                cols.append(str(result['data'][0]['wind_direction']))
+                cols.append(str(result['data'][0]['wind_speed']))
+                if i == 0:
+                    headers.append("eastward_wind")
+                    headers.append("northward_wind")
+                    headers.append("wind_direction")
+                    headers.append("wind_speed")
+
+            if i == 0:
+                rows.append(",".join(headers))
+            rows.append(",".join(cols))
+
+        return "\r\n".join(rows)
\ No newline at end of file
diff --git a/analysis/webservice/algorithms/__init__.py b/analysis/webservice/algorithms/__init__.py
index 6063009..d9c6492 100644
--- a/analysis/webservice/algorithms/__init__.py
+++ b/analysis/webservice/algorithms/__init__.py
@@ -18,6 +18,7 @@ from . import Capabilities
 from . import CorrelationMap
 from . import DailyDifferenceAverage
 from . import DataInBoundsSearch
+from . import DataInBoundsSearchMB
 from . import DataSeriesList
 from . import DelayTest
 from . import ErrorTosserTest
diff --git a/data-access/nexustiles/dao/SolrProxy.py b/data-access/nexustiles/dao/SolrProxy.py
index 39eed37..d5cab55 100644
--- a/data-access/nexustiles/dao/SolrProxy.py
+++ b/data-access/nexustiles/dao/SolrProxy.py
@@ -654,7 +654,8 @@ class SolrProxy(object):
             response = self.do_query_raw(*args, **params)
             results.extend(response.docs)
 
-        assert len(results) == limit
+        # often does not work if tiles are being ingested during the request
+        #assert len(results) == limit
 
         return results
 
diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py
index 753d264..e493484 100644
--- a/data-access/nexustiles/model/nexusmodel.py
+++ b/data-access/nexustiles/model/nexusmodel.py
@@ -14,12 +14,19 @@
 # limitations under the License.
 
 from collections import namedtuple
-
+import logging
+import sys
 import numpy as np
+import math
 from dataclasses import dataclass
 
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger(__name__)
 
-NexusPoint = namedtuple('NexusPoint', 'latitude longitude depth time index data_vals')
+NexusPoint = namedtuple('NexusPoint', 'latitude longitude depth time index data_val')
 BBox = namedtuple('BBox', 'min_lat max_lat min_lon max_lon')
 TileStats = namedtuple('TileStats', 'min max mean count')
 
@@ -123,25 +130,89 @@ class Tile(object):
         except AttributeError:
             summary['meta_data'] = 'None'
 
+        try:
+            summary['variables'] = [tv.variable_name for tv in summary['variables']]
+        except AttributeError:
+            summary['variables'] = []
+
         return summary
 
+    def __generate_point(self, index):
+        try:
+            # original
+            time = self.times[index[0]]
+            lat = self.latitudes[index[1]]
+            lon = self.longitudes[index[2]]
+            data_val = self.data[index]
+            logger.debug(f'__generate_point: index: {index}')
+            logger.debug(
+                f'__generate_point: self.data shape: {self.data.shape}. time length: {len(self.times)}. lat len: {len(self.latitudes)}, lon len: {len(self.longitudes)}')
+            # time = self.times[index[2]]
+            # lat = self.latitudes[index[0]]
+            # lon = self.longitudes[index[1]]
+            # time = self.times[index[2]]
+            # data_val = self.data[index]
+            point = NexusPoint(lat, lon, None, time, index, data_val)
+        except Exception as e:
+            logger.error(
+                f'times: {self.times}. lat: {self.latitudes}. lon: {self.longitudes}. data: {self.data}. index: {index}')
+            raise e
+        return point
+
+    def __generate_point_multi_band(self, index, transposed_data):
+        try:
+            # original
+            time = self.times[0]
+            lat = self.latitudes[index[0]]
+            lon = self.longitudes[index[1]]
+            data_val = transposed_data[index[0]][index[1]]
+            logger.debug(f'__generate_point: index: {index}')
+            logger.debug(
+                f'__generate_point: transposed_data shape: {transposed_data.shape}. time length: {len(self.times)}. lat len: {len(self.latitudes)}, lon len: {len(self.longitudes)}')
+            # time = self.times[index[2]]
+            # lat = self.latitudes[index[0]]
+            # lon = self.longitudes[index[1]]
+            # time = self.times[index[2]]
+            # data_val = self.data[index]
+            point = NexusPoint(lat, lon, None, time, index, data_val)
+        except Exception as e:
+            logger.error(
+                f'times: {self.times}. lat: {self.latitudes}. lon: {self.longitudes}. data: {transposed_data}. index: {index}')
+            raise e
+        return point
+
+    def nexus_point_generator_multi_band(self, include_nan=False):
+        logger.debug(f'existing data dimension: {type(self.data)}')
+        # TODO performed a quickfix. since the new grid multivariable processing does not squeeze. it becomes 4D instead of 3D. It is breaking this method. So, it is squeezed. And it no longer needs to transpose as the variables are the last dimension
+        # transposed_data = np.transpose(self.data, (1, 2, 0))
+        transposed_data = np.transpose(self.data)
+        logger.debug(f'sample data: {transposed_data[0][0][0]}. shape: {type(transposed_data[0][0][0])}')
+        if include_nan:
+            for ij in np.ndindex(transposed_data.shape[:1]):
+                logger.debug(f'nexus_point_generator_multi_band#include_nan#index: {ij}')
+                yield self.__generate_point_multi_band(ij, transposed_data)
+        else: ## NOT TESTED DON't USE
+            for index in np.transpose(np.ma.nonzero(np.count_nonzero(transposed_data, axis=2))):
+                logger.debug(f'nexus_point_generator_multi_band#not_include_nan#index: {index}')
+                yield self.__generate_point_multi_band(index, transposed_data)
+        return
+
+    def remove_nan_data(self):
+        tile_data = self.data
+        logger.info(f'tile_data: {tile_data[0][0][0]}. type: {type(tile_data[0][0][0])}')
+        transposed_data = np.transpose(self.data, (1, 2, 0))
+        return transposed_data
+
     def nexus_point_generator(self, include_nan=False):
         if include_nan:
             for index in np.ndindex(self.data.shape):
-                time = self.times[index[0]]
-                lat = self.latitudes[index[1]]
-                lon = self.longitudes[index[2]]
-                data_val = self.data[index]
-                point = NexusPoint(lat, lon, None, time, index, data_val)
+                logger.debug(f'index: {index}')
+                point = self.__generate_point(index)
                 yield point
         else:
             for index in np.transpose(np.ma.nonzero(self.data)):
-                index = tuple(index)
-                time = self.times[index[0]]
-                lat = self.latitudes[index[1]]
-                lon = self.longitudes[index[2]]
-                data_val = self.data[index]
-                point = NexusPoint(lat, lon, None, time, index, data_val)
+                logger.debug(f'index: {index}')
+                point = self.__generate_point(tuple(index))
                 yield point
 
     def get_indices(self, include_nan=False):
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 7483c2b..fb7a2c8 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -419,9 +419,10 @@ class NexusTileService(object):
                 tile.times = ma.masked_outside(tile.times, start_time, end_time)
 
                 # Or together the masks of the individual arrays to create the new mask
-                data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \
-                            | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \
-                            | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :]
+                data_mask = ma.getmaskarray(tile.variables)[:, np.newaxis, np.newaxis, np.newaxis] \
+                            | ma.getmaskarray(tile.times)[np.newaxis, :, np.newaxis, np.newaxis] \
+                            | ma.getmaskarray(tile.latitudes)[np.newaxis, np.newaxis, :, np.newaxis] \
+                            | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, np.newaxis, :]
 
                 tile.data = ma.masked_where(data_mask, tile.data)
 
diff --git a/helm/templates/granule-ingester.yml b/helm/templates/granule-ingester.yml
index 405edb8..9cfc7c7 100644
--- a/helm/templates/granule-ingester.yml
+++ b/helm/templates/granule-ingester.yml
@@ -36,7 +36,7 @@ spec:
             - name: ZK_HOST_AND_PORT
               value: {{ include "nexus.urls.zookeeper" . }}
             {{ if .Values.ingestion.granuleIngester.maxConcurrency }}
-            - name: MAX_CONCURRENCY
+            - name: MAX_THREADS
               value: "{{ .Values.ingestion.granuleIngester.maxConcurrency }}"
             {{ end }}
             {{- range $name, $value := .Values.ingestion.granules.s3.awsCredsEnvs }}
diff --git a/helm/templates/solr-create-collection.yml b/helm/templates/solr-create-collection.yml
index 717cb42..74451c9 100644
--- a/helm/templates/solr-create-collection.yml
+++ b/helm/templates/solr-create-collection.yml
@@ -19,8 +19,8 @@ spec:
         image: nexusjpl/solr-cloud-init:1.0.2
         resources:
           requests:
-            memory: "0.5Gi"
-            cpu: "0.25"
+            memory: "0.1Gi"
+            cpu: "0.1"
         env:
         - name: MINIMUM_NODES
           value: "{{ .Values.solr.replicaCount }}"
@@ -31,4 +31,4 @@ spec:
         - name: CREATE_COLLECTION_PARAMS
           value: "name=nexustiles&numShards=$(MINIMUM_NODES)&waitForFinalState=true"
       restartPolicy: Always
-{{ end }}
\ No newline at end of file
+{{ end }}