You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2023/04/13 17:51:27 UTC
[incubator-sdap-ingester] branch dev updated: SDAP-435 + SDAP-437 - Granule preprocessing and timedelta array support (#73)
This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new a6e858b SDAP-435 + SDAP-437 - Granule preprocessing and timedelta array support (#73)
a6e858b is described below
commit a6e858b2b703922c8dcdc0839f3937bddb0576e5
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Thu Apr 13 10:51:21 2023 -0700
SDAP-435 + SDAP-437 - Granule preprocessing and timedelta array support (#73)
* Fix for time array being timedelta type
Ingest tested
Tile usability untested
* Changelog
* CM: Handle squeeze dimensions in config & pass to RMQ
* GI: Handle squeeze in GranuleLoader
* GI: Handle squeeze in rmq message
* GI: Changed some logging messages to DEBUG
* Preprocessor package
* Preprocessor package
* CM: Handle preprocess section in CC
* GI: Apply preprocessors to input granules on open
* Changelog
* Cleanup
Cleaned up imports & removed previous squeeze implementation
* Minor: Adjust level of logging statement
---------
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
CHANGELOG.md | 2 ++
.../collection_manager/entities/Collection.py | 9 ++++--
.../services/CollectionProcessor.py | 7 ++--
.../granule_loaders/GranuleLoader.py | 37 +++++++++++++++++++---
.../granule_loaders/Preprocessors.py | 22 +++++++++++++
.../granule_ingester/pipeline/Pipeline.py | 11 ++++---
.../preprocessors/GranulePreprocessor.py | 24 ++++++++++++++
.../granule_ingester/preprocessors/Squeeze.py | 32 +++++++++++++++++++
.../granule_ingester/preprocessors/__init__.py | 18 +++++++++++
.../reading_processors/TileReadingProcessor.py | 11 +++++--
10 files changed, 157 insertions(+), 16 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5091938..79b8e24 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,12 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
+- SDAP-437: Added support for preprocessing of input granules. Initial implementation contains one preprocessor implementation for squeezing one or more dimensions to ensure the dataset is shaped as needed.
### Changed
### Deprecated
### Removed
### Fixed
- SDAP-423: Fixed verbosity settings not propagating to ingester subprocesses
- SDAP-417: Fixed bug where very spatially narrow tiles would have their WKT for the geo field represent the incorrect shape (ie a very narrow polygon being rounded to a line), which would cause an error on write to Solr.
+- SDAP-435: Added case for handling time arrays of type `np.timedelta64`
### Security
## [1.0.0] - 2022-12-05
diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index ee1f696..d657bae 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -21,8 +21,7 @@ from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from fnmatch import fnmatch
-from glob import glob
-from typing import List, Optional
+from typing import Optional
from urllib.parse import urlparse
from collection_manager.entities.exceptions import MissingValueCollectionError
@@ -47,6 +46,7 @@ class Collection:
forward_processing_priority: Optional[int] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
+ preprocess: str = None
@staticmethod
def __decode_dimension_names(dimension_names_dict):
@@ -79,6 +79,8 @@ class Collection:
date_to = datetime.fromisoformat(properties['to']) if 'to' in properties else None
date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None
+ preprocess = json.dumps(properties['preprocess']) if 'preprocess' in properties else None
+
collection = Collection(dataset_id=properties['id'],
projection=properties['projection'],
dimension_names=frozenset(Collection.__decode_dimension_names(properties['dimensionNames'])),
@@ -87,7 +89,8 @@ class Collection:
historical_priority=properties['priority'],
forward_processing_priority=properties.get('forward-processing-priority', None),
date_to=date_to,
- date_from=date_from)
+ date_from=date_from,
+ preprocess=preprocess)
return collection
except KeyError as e:
raise MissingValueCollectionError(missing_value=e.args[0])
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 28f2983..bbf6bf9 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import json
import logging
import os.path
-from glob import glob
from typing import Dict
-from datetime import datetime
import yaml
from collection_manager.entities import Collection
@@ -122,6 +121,10 @@ class CollectionProcessor:
},
'processors': CollectionProcessor._get_default_processors(collection)
}
+
+ if collection.preprocess is not None:
+ config_dict['preprocess'] = json.loads(collection.preprocess)
+
config_str = yaml.dump(config_dict)
logger.debug(f"Templated dataset config:\n{config_str}")
return config_str
diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
index 6377de0..6a7978f 100644
--- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
+++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
@@ -20,8 +20,9 @@ from urllib import parse
import aioboto3
import xarray as xr
-
-from granule_ingester.exceptions import GranuleLoadingError
+from granule_ingester.exceptions import GranuleLoadingError, PipelineBuildingError
+from granule_ingester.granule_loaders.Preprocessors import modules as module_mappings
+from granule_ingester.preprocessors import GranulePreprocessor
logger = logging.getLogger(__name__)
@@ -29,10 +30,12 @@ logger = logging.getLogger(__name__)
class GranuleLoader:
def __init__(self, resource: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
self._granule_temp_file = None
self._resource = resource
+ self._preprocess = None
+
+ if 'preprocess' in kwargs:
+ self._preprocess = [GranuleLoader._parse_module(module) for module in kwargs['preprocess']]
async def __aenter__(self):
return await self.open()
@@ -55,7 +58,16 @@ class GranuleLoader:
granule_name = os.path.basename(self._resource)
try:
- return xr.open_dataset(file_path, lock=False), granule_name
+ ds = xr.open_dataset(file_path, lock=False)
+
+ if self._preprocess is not None:
+ logger.info(f'There are {len(self._preprocess)} preprocessors to apply for granule {self._resource}')
+ while len(self._preprocess) > 0:
+ preprocessor: GranulePreprocessor = self._preprocess.pop(0)
+
+ ds = preprocessor.process(ds)
+
+ return ds, granule_name
except FileNotFoundError:
raise GranuleLoadingError(f"The granule file {self._resource} does not exist.")
except Exception:
@@ -76,3 +88,18 @@ class GranuleLoader:
fp.write(data)
logger.info("Saved downloaded file to {}.".format(fp.name))
return fp
+
+ @staticmethod
+ def _parse_module(module_config: dict):
+ module_name = module_config.pop('name')
+ try:
+ module_class = module_mappings[module_name]
+ logger.debug("Loaded preprocessor {}.".format(module_class))
+ processor_module = module_class(**module_config)
+ except KeyError:
+ raise PipelineBuildingError(f"'{module_name}' is not a valid preprocessor.")
+ except Exception as e:
+ raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}")
+
+ return processor_module
+
diff --git a/granule_ingester/granule_ingester/granule_loaders/Preprocessors.py b/granule_ingester/granule_ingester/granule_loaders/Preprocessors.py
new file mode 100644
index 0000000..f091fe2
--- /dev/null
+++ b/granule_ingester/granule_ingester/granule_loaders/Preprocessors.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Dict, Type
+
+from granule_ingester.preprocessors import (GranulePreprocessor, Squeeze)
+
+modules: Dict[str, Type[GranulePreprocessor]] = {
+ 'squeeze': Squeeze
+}
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 41bfc3a..9ebb529 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -69,7 +69,7 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
async def _process_tile_in_worker(serialized_input_tile: str):
try:
- logger.info('Starting tile creation subprocess')
+ logger.debug('Starting tile creation subprocess')
logger.debug(f'serialized_input_tile: {serialized_input_tile}')
input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
logger.info(f'Creating tile for slice {input_tile.summary.section_spec}')
@@ -79,11 +79,11 @@ async def _process_tile_in_worker(serialized_input_tile: str):
logger.info('Processed tile is empty; adding None result to return')
return None
- logger.info('Tile processing complete; serializing output tile')
+ logger.debug('Tile processing complete; serializing output tile')
serialized_output_tile = nexusproto.NexusTile.SerializeToString(processed_tile)
- logger.info('Adding serialized result to return')
+ logger.debug('Adding serialized result to return')
return serialized_output_tile
except Exception as e:
@@ -171,7 +171,10 @@ class Pipeline:
module_mappings: dict,
max_concurrency: int):
try:
- granule_loader = GranuleLoader(**config['granule'])
+ if 'preprocess' in config:
+ granule_loader = GranuleLoader(**config['granule'], **{'preprocess': config['preprocess']})
+ else:
+ granule_loader = GranuleLoader(**config['granule'])
slicer_config = config['slicer']
slicer = cls._parse_module(slicer_config, module_mappings)
diff --git a/granule_ingester/granule_ingester/preprocessors/GranulePreprocessor.py b/granule_ingester/granule_ingester/preprocessors/GranulePreprocessor.py
new file mode 100644
index 0000000..1eb1ca3
--- /dev/null
+++ b/granule_ingester/granule_ingester/preprocessors/GranulePreprocessor.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+import xarray as xr
+
+
+class GranulePreprocessor(ABC):
+ @abstractmethod
+ def process(self, input_dataset: xr.Dataset, *args, **kwargs):
+ pass
diff --git a/granule_ingester/granule_ingester/preprocessors/Squeeze.py b/granule_ingester/granule_ingester/preprocessors/Squeeze.py
new file mode 100644
index 0000000..8d944c0
--- /dev/null
+++ b/granule_ingester/granule_ingester/preprocessors/Squeeze.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import xarray as xr
+from granule_ingester.preprocessors.GranulePreprocessor import GranulePreprocessor
+
+logger = logging.getLogger(__name__)
+
+
+class Squeeze(GranulePreprocessor):
+ def __init__(self, dimensions: list):
+ self._dimensions = dimensions
+
+ def process(self, input_dataset: xr.Dataset, *args, **kwargs):
+ logger.debug(f'Squeezing dimensions {self._dimensions}')
+ output_ds = input_dataset.squeeze(self._dimensions)
+ logger.debug(f'Squeezed dimensions: {input_dataset.dims} -> {output_ds.dims}')
+ return output_ds
diff --git a/granule_ingester/granule_ingester/preprocessors/__init__.py b/granule_ingester/granule_ingester/preprocessors/__init__.py
new file mode 100644
index 0000000..6f55d91
--- /dev/null
+++ b/granule_ingester/granule_ingester/preprocessors/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from granule_ingester.preprocessors.GranulePreprocessor import GranulePreprocessor
+from granule_ingester.preprocessors.Squeeze import Squeeze
+
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 68561e2..a6c3f2e 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -20,10 +20,9 @@ from typing import Dict, Union
import numpy as np
import xarray as xr
-from nexusproto import DataTile_pb2 as nexusproto
-
from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
+from nexusproto import DataTile_pb2 as nexusproto
logger = logging.getLogger(__name__)
@@ -53,6 +52,7 @@ class TileReadingProcessor(TileProcessor, ABC):
return self._generate_tile(dataset, dimensions_to_slices, output_tile)
except Exception as e:
+ logger.exception(e)
raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.")
@abstractmethod
@@ -86,5 +86,12 @@ class TileReadingProcessor(TileProcessor, ABC):
def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
if times.dtype == np.float32:
return times
+ elif times.dtype.type == np.timedelta64: # If time is an array of offsets from a fixed reference
+ reference = times.time.item() / 1e9 # Get the base time in seconds
+
+ times = (times / 1e9).astype(int) # Convert offset array to seconds
+ times = times.where(times != -9223372036854775808) # Replace NaT values with NaN
+
+ return times + reference # Add base to offsets
epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
return ((times - epoch) / 1e9).astype(int)