You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2023/04/13 17:51:27 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-435 + SDAP-437 - Granule preprocessing and timedelta array support (#73)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new a6e858b  SDAP-435 + SDAP-437 - Granule preprocessing and timedelta array support (#73)
a6e858b is described below

commit a6e858b2b703922c8dcdc0839f3937bddb0576e5
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Thu Apr 13 10:51:21 2023 -0700

    SDAP-435 + SDAP-437 - Granule preprocessing and timedelta array support (#73)
    
    * Fix for time array being timedelta type
    
    Ingest tested
    Tile usability untested
    
    * Changelog
    
    * CM: Handle squeeze dimensions in config & pass to RMQ
    
    * GI: Handle squeeze in GranuleLoader
    
    * GI: Handle squeeze in rmq message
    
    * GI: Changed some logging messages to DEBUG
    
    * Preprocessor package
    
    * Preprocessor package
    
    * CM: Handle preprocess section in CC
    
    * GI: Apply preprocessors to input granules on open
    
    * Changelog
    
    * Cleanup
    
    Cleaned up imports & removed previous squeeze implementation
    
    * Minor: Adjust level of logging statement
    
    ---------
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 CHANGELOG.md                                       |  2 ++
 .../collection_manager/entities/Collection.py      |  9 ++++--
 .../services/CollectionProcessor.py                |  7 ++--
 .../granule_loaders/GranuleLoader.py               | 37 +++++++++++++++++++---
 .../granule_loaders/Preprocessors.py               | 22 +++++++++++++
 .../granule_ingester/pipeline/Pipeline.py          | 11 ++++---
 .../preprocessors/GranulePreprocessor.py           | 24 ++++++++++++++
 .../granule_ingester/preprocessors/Squeeze.py      | 32 +++++++++++++++++++
 .../granule_ingester/preprocessors/__init__.py     | 18 +++++++++++
 .../reading_processors/TileReadingProcessor.py     | 11 +++++--
 10 files changed, 157 insertions(+), 16 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5091938..79b8e24 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,12 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 ### Added
+- SDAP-437: Added support for preprocessing of input granules. Initial implementation contains one preprocessor implementation for squeezing one or more dimensions to ensure the dataset is shaped as needed.
 ### Changed
 ### Deprecated
 ### Removed
 ### Fixed
 - SDAP-423: Fixed verbosity settings not propagating to ingester subprocesses
 - SDAP-417: Fixed bug where very spatially narrow tiles would have their WKT for the geo field represent the incorrect shape (ie a very narrow polygon being rounded to a line), which would cause an error on write to Solr.
+- SDAP-435: Added case for handling time arrays of type `np.timedelta64`
 ### Security
 
 ## [1.0.0] - 2022-12-05
diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index ee1f696..d657bae 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -21,8 +21,7 @@ from dataclasses import dataclass
 from datetime import datetime
 from enum import Enum
 from fnmatch import fnmatch
-from glob import glob
-from typing import List, Optional
+from typing import Optional
 from urllib.parse import urlparse
 
 from collection_manager.entities.exceptions import MissingValueCollectionError
@@ -47,6 +46,7 @@ class Collection:
     forward_processing_priority: Optional[int] = None
     date_from: Optional[datetime] = None
     date_to: Optional[datetime] = None
+    preprocess: str = None
 
     @staticmethod
     def __decode_dimension_names(dimension_names_dict):
@@ -79,6 +79,8 @@ class Collection:
             date_to = datetime.fromisoformat(properties['to']) if 'to' in properties else None
             date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None
 
+            preprocess = json.dumps(properties['preprocess']) if 'preprocess' in properties else None
+
             collection = Collection(dataset_id=properties['id'],
                                     projection=properties['projection'],
                                     dimension_names=frozenset(Collection.__decode_dimension_names(properties['dimensionNames'])),
@@ -87,7 +89,8 @@ class Collection:
                                     historical_priority=properties['priority'],
                                     forward_processing_priority=properties.get('forward-processing-priority', None),
                                     date_to=date_to,
-                                    date_from=date_from)
+                                    date_from=date_from,
+                                    preprocess=preprocess)
             return collection
         except KeyError as e:
             raise MissingValueCollectionError(missing_value=e.args[0])
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 28f2983..bbf6bf9 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -13,11 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
 import logging
 import os.path
-from glob import glob
 from typing import Dict
-from datetime import datetime
 
 import yaml
 from collection_manager.entities import Collection
@@ -122,6 +121,10 @@ class CollectionProcessor:
             },
             'processors': CollectionProcessor._get_default_processors(collection)
         }
+
+        if collection.preprocess is not None:
+            config_dict['preprocess'] = json.loads(collection.preprocess)
+
         config_str = yaml.dump(config_dict)
         logger.debug(f"Templated dataset config:\n{config_str}")
         return config_str
diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
index 6377de0..6a7978f 100644
--- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
+++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
@@ -20,8 +20,9 @@ from urllib import parse
 
 import aioboto3
 import xarray as xr
-
-from granule_ingester.exceptions import GranuleLoadingError
+from granule_ingester.exceptions import GranuleLoadingError, PipelineBuildingError
+from granule_ingester.granule_loaders.Preprocessors import modules as module_mappings
+from granule_ingester.preprocessors import GranulePreprocessor
 
 logger = logging.getLogger(__name__)
 
@@ -29,10 +30,12 @@ logger = logging.getLogger(__name__)
 class GranuleLoader:
 
     def __init__(self, resource: str, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-
         self._granule_temp_file = None
         self._resource = resource
+        self._preprocess = None
+
+        if 'preprocess' in kwargs:
+            self._preprocess = [GranuleLoader._parse_module(module) for module in kwargs['preprocess']]
 
     async def __aenter__(self):
         return await self.open()
@@ -55,7 +58,16 @@ class GranuleLoader:
 
         granule_name = os.path.basename(self._resource)
         try:
-            return xr.open_dataset(file_path, lock=False), granule_name
+            ds = xr.open_dataset(file_path, lock=False)
+
+            if self._preprocess is not None:
+                logger.info(f'There are {len(self._preprocess)} preprocessors to apply for granule {self._resource}')
+                while len(self._preprocess) > 0:
+                    preprocessor: GranulePreprocessor = self._preprocess.pop(0)
+
+                    ds = preprocessor.process(ds)
+
+            return ds, granule_name
         except FileNotFoundError:
             raise GranuleLoadingError(f"The granule file {self._resource} does not exist.")
         except Exception:
@@ -76,3 +88,18 @@ class GranuleLoader:
         fp.write(data)
         logger.info("Saved downloaded file to {}.".format(fp.name))
         return fp
+
+    @staticmethod
+    def _parse_module(module_config: dict):
+        module_name = module_config.pop('name')
+        try:
+            module_class = module_mappings[module_name]
+            logger.debug("Loaded preprocessor {}.".format(module_class))
+            processor_module = module_class(**module_config)
+        except KeyError:
+            raise PipelineBuildingError(f"'{module_name}' is not a valid preprocessor.")
+        except Exception as e:
+            raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}")
+
+        return processor_module
+
diff --git a/granule_ingester/granule_ingester/granule_loaders/Preprocessors.py b/granule_ingester/granule_ingester/granule_loaders/Preprocessors.py
new file mode 100644
index 0000000..f091fe2
--- /dev/null
+++ b/granule_ingester/granule_ingester/granule_loaders/Preprocessors.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Dict, Type
+
+from granule_ingester.preprocessors import (GranulePreprocessor, Squeeze)
+
+modules: Dict[str, Type[GranulePreprocessor]] = {
+    'squeeze': Squeeze
+}
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 41bfc3a..9ebb529 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -69,7 +69,7 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
 
 async def _process_tile_in_worker(serialized_input_tile: str):
     try:
-        logger.info('Starting tile creation subprocess')
+        logger.debug('Starting tile creation subprocess')
         logger.debug(f'serialized_input_tile: {serialized_input_tile}')
         input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
         logger.info(f'Creating tile for slice {input_tile.summary.section_spec}')
@@ -79,11 +79,11 @@ async def _process_tile_in_worker(serialized_input_tile: str):
             logger.info('Processed tile is empty; adding None result to return')
             return None
 
-        logger.info('Tile processing complete; serializing output tile')
+        logger.debug('Tile processing complete; serializing output tile')
 
         serialized_output_tile = nexusproto.NexusTile.SerializeToString(processed_tile)
 
-        logger.info('Adding serialized result to return')
+        logger.debug('Adding serialized result to return')
 
         return serialized_output_tile
     except Exception as e:
@@ -171,7 +171,10 @@ class Pipeline:
                         module_mappings: dict,
                         max_concurrency: int):
         try:
-            granule_loader = GranuleLoader(**config['granule'])
+            if 'preprocess' in config:
+                granule_loader = GranuleLoader(**config['granule'], **{'preprocess': config['preprocess']})
+            else:
+                granule_loader = GranuleLoader(**config['granule'])
 
             slicer_config = config['slicer']
             slicer = cls._parse_module(slicer_config, module_mappings)
diff --git a/granule_ingester/granule_ingester/preprocessors/GranulePreprocessor.py b/granule_ingester/granule_ingester/preprocessors/GranulePreprocessor.py
new file mode 100644
index 0000000..1eb1ca3
--- /dev/null
+++ b/granule_ingester/granule_ingester/preprocessors/GranulePreprocessor.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+import xarray as xr
+
+
+class GranulePreprocessor(ABC):
+    @abstractmethod
+    def process(self, input_dataset: xr.Dataset, *args, **kwargs):
+        pass
diff --git a/granule_ingester/granule_ingester/preprocessors/Squeeze.py b/granule_ingester/granule_ingester/preprocessors/Squeeze.py
new file mode 100644
index 0000000..8d944c0
--- /dev/null
+++ b/granule_ingester/granule_ingester/preprocessors/Squeeze.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import xarray as xr
+from granule_ingester.preprocessors.GranulePreprocessor import GranulePreprocessor
+
+logger = logging.getLogger(__name__)
+
+
+class Squeeze(GranulePreprocessor):
+    def __init__(self, dimensions: list):
+        self._dimensions = dimensions
+
+    def process(self, input_dataset: xr.Dataset, *args, **kwargs):
+        logger.debug(f'Squeezing dimensions {self._dimensions}')
+        output_ds = input_dataset.squeeze(self._dimensions)
+        logger.debug(f'Squeezed dimensions: {input_dataset.dims} -> {output_ds.dims}')
+        return output_ds
diff --git a/granule_ingester/granule_ingester/preprocessors/__init__.py b/granule_ingester/granule_ingester/preprocessors/__init__.py
new file mode 100644
index 0000000..6f55d91
--- /dev/null
+++ b/granule_ingester/granule_ingester/preprocessors/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from granule_ingester.preprocessors.GranulePreprocessor import GranulePreprocessor
+from granule_ingester.preprocessors.Squeeze import Squeeze
+
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 68561e2..a6c3f2e 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -20,10 +20,9 @@ from typing import Dict, Union
 
 import numpy as np
 import xarray as xr
-from nexusproto import DataTile_pb2 as nexusproto
-
 from granule_ingester.exceptions import TileProcessingError
 from granule_ingester.processors.TileProcessor import TileProcessor
+from nexusproto import DataTile_pb2 as nexusproto
 
 logger = logging.getLogger(__name__)
 
@@ -53,6 +52,7 @@ class TileReadingProcessor(TileProcessor, ABC):
 
             return self._generate_tile(dataset, dimensions_to_slices, output_tile)
         except Exception as e:
+            logger.exception(e)
             raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.")
 
     @abstractmethod
@@ -86,5 +86,12 @@ class TileReadingProcessor(TileProcessor, ABC):
     def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
         if times.dtype == np.float32:
             return times
+        elif times.dtype.type == np.timedelta64:    # If time is an array of offsets from a fixed reference
+            reference = times.time.item() / 1e9     # Get the base time in seconds
+
+            times = (times / 1e9).astype(int)       # Convert offset array to seconds
+            times = times.where(times != -9223372036854775808)  # Replace NaT values with NaN
+
+            return times + reference    # Add base to offsets
         epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
         return ((times - epoch) / 1e9).astype(int)