You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/23 00:49:44 UTC
[incubator-sdap-ingester] branch dev updated: SDAP-245: Move
granule ingester code into this repo (#2)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 08093ee SDAP-245: Move granule ingester code into this repo (#2)
08093ee is described below
commit 08093ee15346713decdc33afab23790f2aee8a76
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Jun 22 17:49:38 2020 -0700
SDAP-245: Move granule ingester code into this repo (#2)
Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
.gitattributes | 1 -
granule_ingester/.gitignore | 9 +
granule_ingester/README.md | 34 +++
granule_ingester/conda-requirements.txt | 10 +
granule_ingester/docker/Dockerfile | 21 ++
granule_ingester/docker/entrypoint.sh | 10 +
granule_ingester/docker/install_nexusproto.sh | 20 ++
granule_ingester/granule_ingester/__init__.py | 0
.../granule_ingester/consumer/Consumer.py | 88 +++++++
.../granule_ingester/consumer/__init__.py | 1 +
.../granule_loaders/GranuleLoader.py | 71 ++++++
.../granule_ingester/granule_loaders/__init__.py | 1 +
.../granule_ingester/healthcheck/HealthCheck.py | 22 ++
.../granule_ingester/healthcheck/__init__.py | 1 +
granule_ingester/granule_ingester/main.py | 118 +++++++++
.../granule_ingester/pipeline/Modules.py | 15 ++
.../granule_ingester/pipeline/Pipeline.py | 158 ++++++++++++
.../granule_ingester/pipeline/__init__.py | 2 +
.../granule_ingester/processors/EmptyTileFilter.py | 42 ++++
.../granule_ingester/processors/GenerateTileId.py | 32 +++
.../granule_ingester/processors/TileProcessor.py | 23 ++
.../processors/TileSummarizingProcessor.py | 98 ++++++++
.../granule_ingester/processors/__init__.py | 5 +
.../granule_ingester/processors/kelvintocelsius.py | 31 +++
.../reading_processors/EccoReadingProcessor.py | 64 +++++
.../reading_processors/GridReadingProcessor.py | 53 +++++
.../reading_processors/SwathReadingProcessor.py | 47 ++++
.../reading_processors/TileReadingProcessor.py | 81 +++++++
.../TimeSeriesReadingProcessor.py | 83 +++++++
.../processors/reading_processors/__init__.py | 5 +
.../slicers/SliceFileByDimension.py | 55 +++++
.../slicers/SliceFileByStepSize.py | 55 +++++
.../slicers/SliceFileByTilesDesired.py | 68 ++++++
.../granule_ingester/slicers/TileSlicer.py | 56 +++++
.../granule_ingester/slicers/__init__.py | 2 +
.../granule_ingester/writers/CassandraStore.py | 78 ++++++
.../granule_ingester/writers/DataStore.py | 13 +
.../granule_ingester/writers/MetadataStore.py | 11 +
.../granule_ingester/writers/SolrStore.py | 152 ++++++++++++
.../granule_ingester/writers/__init__.py | 4 +
granule_ingester/requirements.txt | 3 +
granule_ingester/setup.py | 34 +++
granule_ingester/tests/__init__.py | 0
.../tests/config_files/analysed_sst.yml | 16 ++
.../config_files/ingestion_config_testfile.yaml | 17 ++
...4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc | Bin 0 -> 1057327 bytes
granule_ingester/tests/granules/OBP_2017_01.nc | Bin 0 -> 2110135 bytes
granule_ingester/tests/granules/OBP_native_grid.nc | Bin 0 -> 1285094 bytes
.../SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 | Bin 0 -> 18672352 bytes
granule_ingester/tests/granules/THETA_199201.nc | Bin 0 -> 4255957 bytes
granule_ingester/tests/granules/empty_mur.nc4 | Bin 0 -> 60937 bytes
.../tests/granules/not_empty_ascatb.nc4 | Bin 0 -> 78036 bytes
.../tests/granules/not_empty_avhrr.nc4 | Bin 0 -> 49511 bytes
granule_ingester/tests/granules/not_empty_ccmp.nc | Bin 0 -> 206870 bytes
granule_ingester/tests/granules/not_empty_mur.nc4 | Bin 0 -> 60907 bytes
granule_ingester/tests/granules/not_empty_smap.h5 | Bin 0 -> 3000192 bytes
granule_ingester/tests/granules/not_empty_wswm.nc | Bin 0 -> 1041568 bytes
granule_ingester/tests/pipeline/__init__.py | 0
granule_ingester/tests/pipeline/test_Pipeline.py | 104 ++++++++
granule_ingester/tests/processors/__init__.py | 0
.../tests/processors/test_GenerateTileId.py | 22 ++
.../tests/reading_processors/__init__.py | 0
.../test_EccoReadingProcessor.py | 64 +++++
.../test_GridReadingProcessor.py | 265 +++++++++++++++++++++
.../test_SwathReadingProcessor.py | 74 ++++++
.../test_TileReadingProcessor.py | 29 +++
.../test_TimeSeriesReadingProcessor.py | 86 +++++++
granule_ingester/tests/slicers/__init__.py | 0
.../tests/slicers/test_SliceFileByDimension.py | 122 ++++++++++
.../tests/slicers/test_SliceFileByStepSize.py | 105 ++++++++
.../tests/slicers/test_SliceFileByTilesDesired.py | 88 +++++++
granule_ingester/tests/slicers/test_TileSlicer.py | 68 ++++++
granule_ingester/tests/writers/__init__.py | 0
granule_ingester/tests/writers/test_SolrStore.py | 54 +++++
74 files changed, 2790 insertions(+), 1 deletion(-)
diff --git a/.gitattributes b/.gitattributes
deleted file mode 100644
index 9abd205..0000000
--- a/.gitattributes
+++ /dev/null
@@ -1 +0,0 @@
-*.nc filter=lfs diff=lfs merge=lfs -text
diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore
new file mode 100644
index 0000000..5408b74
--- /dev/null
+++ b/granule_ingester/.gitignore
@@ -0,0 +1,9 @@
+.vscode
+.idea
+*.egg-info
+*__pycache__
+*.pytest_cache
+*.code-workspace
+.DS_STORE
+build
+dist
\ No newline at end of file
diff --git a/granule_ingester/README.md b/granule_ingester/README.md
new file mode 100644
index 0000000..112f52d
--- /dev/null
+++ b/granule_ingester/README.md
@@ -0,0 +1,34 @@
+# SDAP Granule Ingester
+
+The SDAP Granule Ingester is a service that reads from a RabbitMQ queue for
+YAML-formated string messages produced by the Collection Manager (`/collection_manager`
+in this repo). For each message consumed, this service will read a granule file from
+disk and ingest it into SDAP by processing the granule and writing the resulting
+data to Cassandra and Solr.
+
+
+## Prerequisites
+
+Python 3.7
+
+## Building the service
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+ $ python setup.py install
+
+
+## Launching the service
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+ $ python granule_ingester/main.py -h
+
+## Running the tests
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+ $ pip install pytest
+ $ pytest
+
+## Building the Docker image
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+ $ docker build . -f docker/Dockerfile -t nexusjpl/granule-ingester
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
new file mode 100644
index 0000000..b2af149
--- /dev/null
+++ b/granule_ingester/conda-requirements.txt
@@ -0,0 +1,10 @@
+numpy==1.15.4
+scipy
+netcdf4==1.5.3
+pytz==2019.3
+xarray
+pyyaml==5.3.1
+requests==2.23.0
+aiohttp==3.6.2
+aio-pika
+tenacity
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
new file mode 100644
index 0000000..4b25318
--- /dev/null
+++ b/granule_ingester/docker/Dockerfile
@@ -0,0 +1,21 @@
+FROM continuumio/miniconda3:4.8.2-alpine
+
+USER root
+
+ENV PATH="/opt/conda/bin:$PATH"
+
+RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git openjdk8
+
+COPY /granule_ingester /sdap/granule_ingester
+COPY /setup.py /sdap/setup.py
+COPY /requirements.txt /sdap/requirements.txt
+COPY /conda-requirements.txt /sdap/conda-requirements.txt
+COPY /docker/install_nexusproto.sh /install_nexusproto.sh
+COPY /docker/entrypoint.sh /entrypoint.sh
+
+RUN ./install_nexusproto.sh
+RUN cd /sdap && python setup.py install
+
+RUN apk del .build-deps
+
+ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
new file mode 100644
index 0000000..e6f7262
--- /dev/null
+++ b/granule_ingester/docker/entrypoint.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+
+python /sdap/granule_ingester/main.py \
+ $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
+ $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \
+ $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \
+ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
+ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
+ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
+ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
diff --git a/granule_ingester/docker/install_nexusproto.sh b/granule_ingester/docker/install_nexusproto.sh
new file mode 100755
index 0000000..7ba2cee
--- /dev/null
+++ b/granule_ingester/docker/install_nexusproto.sh
@@ -0,0 +1,20 @@
+set -e
+
+APACHE_NEXUSPROTO="https://github.com/apache/incubator-sdap-nexusproto.git"
+MASTER="master"
+
+GIT_REPO=${1:-$APACHE_NEXUSPROTO}
+GIT_BRANCH=${2:-$MASTER}
+
+mkdir nexusproto
+cd nexusproto
+git init
+git pull ${GIT_REPO} ${GIT_BRANCH}
+
+./gradlew pythonInstall --info
+
+./gradlew install --info
+
+rm -rf /root/.gradle
+cd ..
+rm -rf nexusproto
diff --git a/granule_ingester/granule_ingester/__init__.py b/granule_ingester/granule_ingester/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
new file mode 100644
index 0000000..75d347a
--- /dev/null
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import aio_pika
+
+from granule_ingester.healthcheck import HealthCheck
+from granule_ingester.pipeline import Pipeline
+
+logger = logging.getLogger(__name__)
+
+
+class Consumer(HealthCheck):
+
+ def __init__(self,
+ rabbitmq_host,
+ rabbitmq_username,
+ rabbitmq_password,
+ rabbitmq_queue,
+ data_store_factory,
+ metadata_store_factory):
+ self._rabbitmq_queue = rabbitmq_queue
+ self._data_store_factory = data_store_factory
+ self._metadata_store_factory = metadata_store_factory
+
+ self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
+ password=rabbitmq_password,
+ host=rabbitmq_host)
+ self._connection = None
+
+ async def health_check(self) -> bool:
+ try:
+ connection = await self._get_connection()
+ await connection.close()
+ return True
+ except:
+ logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
+ return False
+
+ async def _get_connection(self):
+ return await aio_pika.connect_robust(self._connection_string)
+
+ async def __aenter__(self):
+ self._connection = await self._get_connection()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self._connection:
+ await self._connection.close()
+
+ @staticmethod
+ async def _received_message(message: aio_pika.IncomingMessage,
+ data_store_factory,
+ metadata_store_factory):
+ logger.info("Received a job from the queue. Starting pipeline.")
+ try:
+ config_str = message.body.decode("utf-8")
+ logger.debug(config_str)
+ pipeline = Pipeline.from_string(config_str=config_str,
+ data_store_factory=data_store_factory,
+ metadata_store_factory=metadata_store_factory)
+ await pipeline.run()
+ message.ack()
+ except Exception as e:
+ message.reject(requeue=True)
+ logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+
+ async def start_consuming(self):
+ channel = await self._connection.channel()
+ await channel.set_qos(prefetch_count=1)
+ queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
+
+ async with queue.iterator() as queue_iter:
+ async for message in queue_iter:
+ await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
diff --git a/granule_ingester/granule_ingester/consumer/__init__.py b/granule_ingester/granule_ingester/consumer/__init__.py
new file mode 100644
index 0000000..35d075b
--- /dev/null
+++ b/granule_ingester/granule_ingester/consumer/__init__.py
@@ -0,0 +1 @@
+from granule_ingester.consumer.Consumer import Consumer
diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
new file mode 100644
index 0000000..c28ffbb
--- /dev/null
+++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+from urllib import parse
+
+import aioboto3
+import xarray as xr
+
+logger = logging.getLogger(__name__)
+
+
+class GranuleLoader:
+
+ def __init__(self, resource: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self._granule_temp_file = None
+ self._resource = resource
+
+ async def __aenter__(self):
+ return await self.open()
+
+ async def __aexit__(self, type, value, traceback):
+ if self._granule_temp_file:
+ self._granule_temp_file.close()
+
+ async def open(self) -> (xr.Dataset, str):
+ resource_url = parse.urlparse(self._resource)
+ if resource_url.scheme == 's3':
+ # We need to save a reference to the temporary granule file so we can delete it when the context manager
+ # closes. The file needs to be kept around until nothing is reading the dataset anymore.
+ self._granule_temp_file = await self._download_s3_file(self._resource)
+ file_path = self._granule_temp_file.name
+ elif resource_url.scheme == '':
+ file_path = self._resource
+ else:
+ raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme))
+
+ granule_name = os.path.basename(self._resource)
+ return xr.open_dataset(file_path, lock=False), granule_name
+
+ @staticmethod
+ async def _download_s3_file(url: str):
+ parsed_url = parse.urlparse(url)
+ logger.info(
+ "Downloading S3 file from bucket '{}' with key '{}'".format(parsed_url.hostname, parsed_url.path[1:]))
+ async with aioboto3.resource("s3") as s3:
+ obj = await s3.Object(bucket_name=parsed_url.hostname, key=parsed_url.path[1:])
+ response = await obj.get()
+ data = await response['Body'].read()
+ logger.info("Finished downloading S3 file.")
+
+ fp = tempfile.NamedTemporaryFile()
+ fp.write(data)
+ logger.info("Saved downloaded file to {}.".format(fp.name))
+ return fp
diff --git a/granule_ingester/granule_ingester/granule_loaders/__init__.py b/granule_ingester/granule_ingester/granule_loaders/__init__.py
new file mode 100644
index 0000000..5df1cb0
--- /dev/null
+++ b/granule_ingester/granule_ingester/granule_loaders/__init__.py
@@ -0,0 +1 @@
+from granule_ingester.granule_loaders.GranuleLoader import GranuleLoader
diff --git a/granule_ingester/granule_ingester/healthcheck/HealthCheck.py b/granule_ingester/granule_ingester/healthcheck/HealthCheck.py
new file mode 100644
index 0000000..390c573
--- /dev/null
+++ b/granule_ingester/granule_ingester/healthcheck/HealthCheck.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+class HealthCheck(ABC):
+ @abstractmethod
+ async def health_check(self) -> bool:
+ pass
diff --git a/granule_ingester/granule_ingester/healthcheck/__init__.py b/granule_ingester/granule_ingester/healthcheck/__init__.py
new file mode 100644
index 0000000..f343c01
--- /dev/null
+++ b/granule_ingester/granule_ingester/healthcheck/__init__.py
@@ -0,0 +1 @@
+from granule_ingester.healthcheck.HealthCheck import HealthCheck
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
new file mode 100644
index 0000000..29795f7
--- /dev/null
+++ b/granule_ingester/granule_ingester/main.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import asyncio
+import logging
+from functools import partial
+from typing import List
+
+from granule_ingester.consumer import Consumer
+from granule_ingester.healthcheck import HealthCheck
+from granule_ingester.writers import CassandraStore
+from granule_ingester.writers import SolrStore
+
+
+def cassandra_factory(contact_points, port):
+ store = CassandraStore(contact_points, port)
+ store.connect()
+ return store
+
+
+def solr_factory(solr_host_and_port):
+ store = SolrStore(solr_host_and_port)
+ store.connect()
+ return store
+
+
+async def run_health_checks(dependencies: List[HealthCheck]):
+ for dependency in dependencies:
+ if not await dependency.health_check():
+ return False
+ return True
+
+
+async def main():
+ parser = argparse.ArgumentParser(description='Process some integers.')
+ parser.add_argument('--rabbitmq_host',
+ default='localhost',
+ metavar='HOST',
+ help='RabbitMQ hostname to connect to. (Default: "localhost")')
+ parser.add_argument('--rabbitmq_username',
+ default='guest',
+ metavar='USERNAME',
+ help='RabbitMQ username. (Default: "guest")')
+ parser.add_argument('--rabbitmq_password',
+ default='guest',
+ metavar='PASSWORD',
+ help='RabbitMQ password. (Default: "guest")')
+ parser.add_argument('--rabbitmq_queue',
+ default="nexus",
+ metavar="QUEUE",
+ help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
+ parser.add_argument('--cassandra_contact_points',
+ default=['localhost'],
+ metavar="HOST",
+ nargs='+',
+ help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")')
+ parser.add_argument('--cassandra_port',
+ default=9042,
+ metavar="PORT",
+ help='Cassandra port. (Default: 9042)')
+ parser.add_argument('--solr_host_and_port',
+ default='http://localhost:8983',
+ metavar='HOST:PORT',
+ help='Solr host and port. (Default: http://localhost:8983)')
+ parser.add_argument('-v',
+ '--verbose',
+ action='store_true',
+ help='Print verbose logs.')
+
+ args = parser.parse_args()
+
+ logging_level = logging.DEBUG if args.verbose else logging.INFO
+ logging.basicConfig(level=logging_level)
+ loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
+ for logger in loggers:
+ logger.setLevel(logging_level)
+
+ logger = logging.getLogger(__name__)
+
+ config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)])
+ logger.info("Using configuration values:\n{}".format(config_values_str))
+
+ cassandra_contact_points = args.cassandra_contact_points
+ cassandra_port = args.cassandra_port
+ solr_host_and_port = args.solr_host_and_port
+
+ consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
+ rabbitmq_username=args.rabbitmq_username,
+ rabbitmq_password=args.rabbitmq_password,
+ rabbitmq_queue=args.rabbitmq_queue,
+ data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
+ metadata_store_factory=partial(solr_factory, solr_host_and_port))
+ if await run_health_checks(
+ [CassandraStore(cassandra_contact_points, cassandra_port),
+ SolrStore(solr_host_and_port),
+ consumer]):
+ async with consumer:
+ logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+ await consumer.start_consuming()
+ else:
+ logger.error("Quitting because not all dependencies passed the health checks.")
+
+
+if __name__ == '__main__':
+ asyncio.run(main())
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py
new file mode 100644
index 0000000..2cf2245
--- /dev/null
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -0,0 +1,15 @@
+from granule_ingester.processors import *
+from granule_ingester.processors.reading_processors import *
+from granule_ingester.slicers import *
+from granule_ingester.granule_loaders import *
+
+modules = {
+ "granule": GranuleLoader,
+ "sliceFileByStepSize": SliceFileByStepSize,
+ "generateTileId": GenerateTileId,
+ "EccoReadingProcessor": EccoReadingProcessor,
+ "GridReadingProcessor": GridReadingProcessor,
+ "tileSummary": TileSummarizingProcessor,
+ "emptyTileFilter": EmptyTileFilter,
+ "kelvinToCelsius": KelvinToCelsius
+}
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
new file mode 100644
index 0000000..8f2dd6f
--- /dev/null
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import time
+from typing import List
+
+import aiomultiprocess
+import xarray as xr
+import yaml
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.granule_loaders import GranuleLoader
+from granule_ingester.pipeline.Modules import modules as processor_module_mappings
+from granule_ingester.processors.TileProcessor import TileProcessor
+from granule_ingester.slicers import TileSlicer
+from granule_ingester.writers import DataStore, MetadataStore
+
+logger = logging.getLogger(__name__)
+
+MAX_QUEUE_SIZE = 2 ** 15 - 1
+
+_worker_data_store: DataStore = None
+_worker_metadata_store: MetadataStore = None
+_worker_processor_list: List[TileProcessor] = None
+_worker_dataset = None
+
+
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+ global _worker_data_store
+ global _worker_metadata_store
+ global _worker_processor_list
+ global _worker_dataset
+
+ # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
+ # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
+ _worker_data_store = data_store_factory()
+ _worker_metadata_store = metadata_store_factory()
+ _worker_processor_list = processor_list
+ _worker_dataset = dataset
+
+
+async def _process_tile_in_worker(serialized_input_tile: str):
+ global _worker_data_store
+ global _worker_metadata_store
+ global _worker_processor_list
+ global _worker_dataset
+
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+
+
+def _recurse(processor_list: List[TileProcessor],
+ dataset: xr.Dataset,
+ input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
+ if len(processor_list) == 0:
+ return input_tile
+ output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
+ return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None
+
+
+class Pipeline:
+ def __init__(self,
+ granule_loader: GranuleLoader,
+ slicer: TileSlicer,
+ data_store_factory,
+ metadata_store_factory,
+ tile_processors: List[TileProcessor]):
+ self._granule_loader = granule_loader
+ self._tile_processors = tile_processors
+ self._slicer = slicer
+ self._data_store_factory = data_store_factory
+ self._metadata_store_factory = metadata_store_factory
+
+ @classmethod
+ def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
+ config = yaml.load(config_str, yaml.FullLoader)
+ return cls._build_pipeline(config,
+ data_store_factory,
+ metadata_store_factory,
+ processor_module_mappings)
+
+ @classmethod
+ def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
+ with open(config_path) as config_file:
+ config = yaml.load(config_file, yaml.FullLoader)
+ return cls._build_pipeline(config,
+ data_store_factory,
+ metadata_store_factory,
+ processor_module_mappings)
+
+ @classmethod
+ def _build_pipeline(cls,
+ config: dict,
+ data_store_factory,
+ metadata_store_factory,
+ module_mappings: dict):
+ granule_loader = GranuleLoader(**config['granule'])
+
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
+
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
+
+ return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+
+ @classmethod
+ def _parse_module(cls, module_config: dict, module_mappings: dict):
+ module_name = module_config.pop('name')
+ try:
+ module_class = module_mappings[module_name]
+ logger.debug("Loaded processor {}.".format(module_class))
+ processor_module = module_class(**module_config)
+ except KeyError:
+ raise RuntimeError("'{}' is not a valid processor.".format(module_name))
+
+ return processor_module
+
+ async def run(self):
+ async with self._granule_loader as (dataset, granule_name):
+ start = time.perf_counter()
+ async with aiomultiprocess.Pool(initializer=_init_worker,
+ initargs=(self._tile_processors,
+ dataset,
+ self._data_store_factory,
+ self._metadata_store_factory)) as pool:
+ serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
+ self._slicer.generate_tiles(dataset, granule_name)]
+ # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
+ # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
+ for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+ await pool.map(_process_tile_in_worker, chunk)
+
+ end = time.perf_counter()
+ logger.info("Pipeline finished in {} seconds".format(end - start))
+
+ @staticmethod
+ def _chunk_list(items, chunk_size):
+ return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/pipeline/__init__.py b/granule_ingester/granule_ingester/pipeline/__init__.py
new file mode 100644
index 0000000..7346aa7
--- /dev/null
+++ b/granule_ingester/granule_ingester/pipeline/__init__.py
@@ -0,0 +1,2 @@
+from granule_ingester.pipeline.Pipeline import Pipeline
+from granule_ingester.pipeline.Modules import modules
diff --git a/granule_ingester/granule_ingester/processors/EmptyTileFilter.py b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
new file mode 100644
index 0000000..4f012f5
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+logger = logging.getLogger(__name__)
+
+
+def parse_input(nexus_tile_data):
+ return nexusproto.NexusTile.FromString(nexus_tile_data)
+
+
+class EmptyTileFilter(TileProcessor):
+ def process(self, tile, *args, **kwargs):
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+ data = from_shaped_array(tile_data.variable_data)
+
+ # Only supply data if there is actual values in the tile
+ if data.size - numpy.count_nonzero(numpy.isnan(data)) > 0:
+ return tile
+ else:
+ logger.warning("Discarding tile from {} because it is empty".format(tile.summary.granule))
+ return None
diff --git a/granule_ingester/granule_ingester/processors/GenerateTileId.py b/granule_ingester/granule_ingester/processors/GenerateTileId.py
new file mode 100644
index 0000000..2d965f7
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/GenerateTileId.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+
+from nexusproto import DataTile_pb2 as nexusproto
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class GenerateTileId(TileProcessor):
+
+ def process(self, tile: nexusproto.NexusTile, *args, **kwargs):
+ granule = os.path.basename(tile.summary.granule)
+ variable_name = tile.summary.data_var_name
+ spec = tile.summary.section_spec
+ generated_id = uuid.uuid3(uuid.NAMESPACE_DNS, granule + variable_name + spec)
+
+ tile.summary.tile_id = str(generated_id)
+ return tile
diff --git a/granule_ingester/granule_ingester/processors/TileProcessor.py b/granule_ingester/granule_ingester/processors/TileProcessor.py
new file mode 100644
index 0000000..d62c504
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/TileProcessor.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+# TODO: make this an informal interface, not an abstract class
+class TileProcessor(ABC):
+ @abstractmethod
+ def process(self, tile, *args, **kwargs):
+ pass
diff --git a/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
new file mode 100644
index 0000000..1fe5d7d
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class NoTimeException(Exception):
+ pass
+
+
+def find_time_min_max(tile_data):
+ if tile_data.time:
+ if isinstance(tile_data.time, nexusproto.ShapedArray):
+ time_data = from_shaped_array(tile_data.time)
+ return int(numpy.nanmin(time_data).item()), int(numpy.nanmax(time_data).item())
+ elif isinstance(tile_data.time, int):
+ return tile_data.time, tile_data.time
+
+ raise NoTimeException
+
+
+class TileSummarizingProcessor(TileProcessor):
+
+ def __init__(self, dataset_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._dataset_name = dataset_name
+
+ def process(self, tile, *args, **kwargs):
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ latitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude))
+ longitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude))
+ data = from_shaped_array(tile_data.variable_data)
+
+ tile_summary = tile.summary if tile.HasField("summary") else nexusproto.TileSummary()
+
+ tile_summary.dataset_name = self._dataset_name
+ tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item()
+ tile_summary.bbox.lat_max = numpy.nanmax(latitudes).item()
+ tile_summary.bbox.lon_min = numpy.nanmin(longitudes).item()
+ tile_summary.bbox.lon_max = numpy.nanmax(longitudes).item()
+ tile_summary.stats.min = numpy.nanmin(data).item()
+ tile_summary.stats.max = numpy.nanmax(data).item()
+ tile_summary.stats.count = data.size - numpy.count_nonzero(numpy.isnan(data))
+
+ # In order to accurately calculate the average we need to weight the data based on the cosine of its latitude
+ # This is handled slightly differently for swath vs. grid data
+ if tile_type == 'swath_tile':
+ # For Swath tiles, len(data) == len(latitudes) == len(longitudes).
+ # So we can simply weight each element in the data array
+ tile_summary.stats.mean = type(self).calculate_mean_for_swath_tile(data, latitudes)
+ elif tile_type == 'grid_tile':
+ # Grid tiles need to repeat the weight for every longitude
+ # TODO This assumes data axis' are ordered as latitude x longitude
+ tile_summary.stats.mean = type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes)
+ else:
+ # Default to simple average with no weighting
+ tile_summary.stats.mean = numpy.nanmean(data).item()
+
+ try:
+ min_time, max_time = find_time_min_max(tile_data)
+ tile_summary.stats.min_time = min_time
+ tile_summary.stats.max_time = max_time
+ except NoTimeException:
+ pass
+
+ tile.summary.CopyFrom(tile_summary)
+ return tile
+
+ @staticmethod
+ def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes):
+ flattened_variable_data = numpy.ma.masked_invalid(variable_data).flatten()
+ repeated_latitudes = numpy.repeat(latitudes, len(longitudes))
+ weights = numpy.cos(numpy.radians(repeated_latitudes))
+ return numpy.ma.average(flattened_variable_data, weights=weights).item()
+
+ @staticmethod
+ def calculate_mean_for_swath_tile(variable_data, latitudes):
+ weights = numpy.cos(numpy.radians(latitudes))
+ return numpy.ma.average(numpy.ma.masked_invalid(variable_data),
+ weights=weights).item()
diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py
new file mode 100644
index 0000000..592d8ea
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/__init__.py
@@ -0,0 +1,5 @@
+from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter
+from granule_ingester.processors.GenerateTileId import GenerateTileId
+from granule_ingester.processors.TileProcessor import TileProcessor
+from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor
+from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
diff --git a/granule_ingester/granule_ingester/processors/kelvintocelsius.py b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
new file mode 100644
index 0000000..e728418
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class KelvinToCelsius(TileProcessor):
+ def process(self, tile, *args, **kwargs):
+ the_tile_type = tile.tile.WhichOneof("tile_type")
+ the_tile_data = getattr(tile.tile, the_tile_type)
+
+ var_data = from_shaped_array(the_tile_data.variable_data) - 273.15
+
+ the_tile_data.variable_data.CopyFrom(to_shaped_array(var_data))
+
+ return tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
new file mode 100644
index 0000000..1876013
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
@@ -0,0 +1,64 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class EccoReadingProcessor(TileReadingProcessor):
+ def __init__(self,
+ variable_to_read,
+ latitude,
+ longitude,
+ tile,
+ depth=None,
+ time=None,
+ **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+ self.depth = depth
+ self.time = time
+ self.tile = tile
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+ new_tile = nexusproto.EccoTile()
+
+ lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+ lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+ lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+ lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+ data_subset = ds[self.variable_to_read][
+ type(self)._slices_for_variable(ds[self.variable_to_read], dimensions_to_slices)]
+ data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+ new_tile.tile = ds[self.tile][dimensions_to_slices[self.tile].start].item()
+
+ if self.depth:
+ depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+ dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+ dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.time:
+ time_slice = dimensions_to_slices[self.time]
+ time_slice_len = time_slice.stop - time_slice.start
+ if time_slice_len > 1:
+ raise RuntimeError(
+ "Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time,
+ dim_len=time_slice_len))
+ new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+ input_tile.tile.ecco_tile.CopyFrom(new_tile)
+ return input_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
new file mode 100644
index 0000000..4354f9e
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -0,0 +1,53 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class GridReadingProcessor(TileReadingProcessor):
+ def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+ self.depth = depth
+ self.time = time
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+ new_tile = nexusproto.GridTile()
+
+ lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+ lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+ lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+ lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+ data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+ dimensions_to_slices)]
+ data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+ if self.depth:
+ depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+ dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+ dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.time:
+ time_slice = dimensions_to_slices[self.time]
+ time_slice_len = time_slice.stop - time_slice.start
+ if time_slice_len > 1:
+ raise RuntimeError(
+ "Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time,
+ dim_len=time_slice_len))
+ new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+ input_tile.tile.grid_tile.CopyFrom(new_tile)
+ return input_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
new file mode 100644
index 0000000..fec28ca
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -0,0 +1,47 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class SwathReadingProcessor(TileReadingProcessor):
+ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+ self.depth = depth
+ self.time = time
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+ new_tile = nexusproto.SwathTile()
+
+ lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+ lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+ lat_subset = np.ma.filled(lat_subset, np.NaN)
+ lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+ time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)]
+ time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+ data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+ dimensions_to_slices)]
+ data_subset = np.ma.filled(data_subset, np.NaN)
+
+ if self.depth:
+ depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+ dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+ dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+ new_tile.time.CopyFrom(to_shaped_array(time_subset))
+ input_tile.tile.swath_tile.CopyFrom(new_tile)
+ return input_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
new file mode 100644
index 0000000..14a44f5
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from abc import ABC, abstractmethod
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class TileReadingProcessor(TileProcessor, ABC):
+
+ def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, **kwargs):
+ self.variable_to_read = variable_to_read
+ self.latitude = latitude
+ self.longitude = longitude
+
+ # Common optional properties
+ self.temp_dir = None
+ self.metadata = None
+ # self.temp_dir = self.environ['TEMP_DIR']
+ # self.metadata = self.environ['META']
+
+ def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
+ dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+
+ output_tile = nexusproto.NexusTile()
+ output_tile.CopyFrom(tile)
+ output_tile.summary.data_var_name = self.variable_to_read
+
+ return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+
+ @abstractmethod
+ def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
+ pass
+
+ @classmethod
+ def _parse_input(cls, the_input_tile, temp_dir):
+ specs = the_input_tile.summary.section_spec
+ tile_specifications = cls._convert_spec_to_slices(specs)
+
+ file_path = the_input_tile.summary.granule
+ file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path
+
+ return tile_specifications, file_path
+
+ @staticmethod
+ def _slices_for_variable(variable: xr.DataArray, dimension_to_slice: Dict[str, slice]) -> Dict[str, slice]:
+ return {dim_name: dimension_to_slice[dim_name] for dim_name in variable.dims}
+
+ @staticmethod
+ def _convert_spec_to_slices(spec):
+ dim_to_slice = {}
+ for dimension in spec.split(','):
+ name, start, stop = dimension.split(':')
+ dim_to_slice[name] = slice(int(start), int(stop))
+
+ return dim_to_slice
+
+ @staticmethod
+ def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
+ if times.dtype == np.float32:
+ return times
+ epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
+ return ((times - epoch) / 1e9).astype(int)
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
new file mode 100644
index 0000000..2831c0c
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -0,0 +1,83 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class TimeSeriesReadingProcessor(TileReadingProcessor):
+ def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+ self.depth = depth
+ self.time = time
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+ new_tile = nexusproto.TimeSeriesTile()
+
+ lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+ lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+ lat_subset = np.ma.filled(lat_subset, np.NaN)
+ lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+ data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+ dimensions_to_slices)]
+ data_subset = np.ma.filled(data_subset, np.NaN)
+
+ if self.depth:
+ depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+ dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+ dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)]
+ time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+ new_tile.time.CopyFrom(to_shaped_array(time_subset))
+
+ input_tile.tile.time_series_tile.CopyFrom(new_tile)
+ return input_tile
+
+ # def read_data(self, tile_specifications, file_path, output_tile):
+ # with xr.decode_cf(xr.open_dataset(file_path, decode_cf=False), decode_times=False) as ds:
+ # for section_spec, dimtoslice in tile_specifications:
+ # tile = nexusproto.TimeSeriesTile()
+ #
+ # instance_dimension = next(
+ # iter([dim for dim in ds[self.variable_to_read].dims if dim != self.time]))
+ #
+ # tile.latitude.CopyFrom(
+ # to_shaped_array(np.ma.filled(ds[self.latitude].data[dimtoslice[instance_dimension]], np.NaN)))
+ #
+ # tile.longitude.CopyFrom(
+ # to_shaped_array(
+ # np.ma.filled(ds[self.longitude].data[dimtoslice[instance_dimension]], np.NaN)))
+ #
+ # # Before we read the data we need to make sure the dimensions are in the proper order so we don't
+ # # have any indexing issues
+ # ordered_slices = slices_for_variable(ds, self.variable_to_read, dimtoslice)
+ # # Read data using the ordered slices, replacing masked values with NaN
+ # data_array = np.ma.filled(ds[self.variable_to_read].data[tuple(ordered_slices.values())], np.NaN)
+ #
+ # tile.variable_data.CopyFrom(to_shaped_array(data_array))
+ #
+ # if self.metadata is not None:
+ # tile.meta_data.add().CopyFrom(
+ # to_metadata(self.metadata, ds[self.metadata].data[tuple(ordered_slices.values())]))
+ #
+ # tile.time.CopyFrom(
+ # to_shaped_array(np.ma.filled(ds[self.time].data[dimtoslice[self.time]], np.NaN)))
+ #
+ # output_tile.tile.time_series_tile.CopyFrom(tile)
+ #
+ # yield output_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
new file mode 100644
index 0000000..2fecce9
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
@@ -0,0 +1,5 @@
+from granule_ingester.processors.reading_processors.EccoReadingProcessor import EccoReadingProcessor
+from granule_ingester.processors.reading_processors.GridReadingProcessor import GridReadingProcessor
+from granule_ingester.processors.reading_processors.SwathReadingProcessor import SwathReadingProcessor
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor import TimeSeriesReadingProcessor
diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py b/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py
new file mode 100644
index 0000000..193cf69
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py
@@ -0,0 +1,55 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple,Set
+#
+# # from granule_ingester.entities import Dimension
+#
+#
+# class SliceFileByDimension:
+# def __init__(self,
+# slice_dimension: str, # slice by this dimension
+# dimension_name_prefix: str = None,
+# *args, **kwargs):
+# super().__init__(*args, **kwargs)
+# self._slice_by_dimension = slice_dimension
+# self._dimension_name_prefix = dimension_name_prefix
+#
+# def generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+# """
+# Generate list of slices in all dimensions as strings.
+#
+# :param dimension_specs: A dict of dimension names to dimension lengths
+# :return: A list of slices across all dimensions, as strings in the form of dim1:0:720,dim2:0:1,dim3:0:360
+# """
+# # check if sliceDimension is int or str
+# is_integer: bool = False
+# try:
+# int(self._slice_by_dimension)
+# is_integer = True
+# except ValueError:
+# pass
+#
+# return self._indexed_dimension_slicing(dimension_specs) if is_integer else self._generate_tile_boundary_slices(self._slice_by_dimension,dimension_specs)
+#
+# def _indexed_dimension_slicing(self, dimension_specs):
+# # python netCDF4 library automatically prepends "phony_dim" if indexed by integer
+# if self._dimension_name_prefix == None or self._dimension_name_prefix == "":
+# self._dimension_name_prefix = "phony_dim_"
+#
+# return self._generate_tile_boundary_slices((self._dimension_name_prefix+self._slice_by_dimension),dimension_specs)
+#
+# def _generate_tile_boundary_slices(self, slice_by_dimension, dimension_specs):
+# dimension_bounds = []
+# for dim_name,dim_len in dimension_specs.items():
+# step_size = 1 if dim_name==slice_by_dimension else dim_len
+#
+# bounds = []
+# for i in range(0,dim_len,step_size):
+# bounds.append(
+# '{name}:{start}:{end}'.format(name=dim_name,
+# start=i,
+# end=min((i + step_size),dim_len) )
+# )
+# dimension_bounds.append(bounds)
+# return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)]
+#
diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py
new file mode 100644
index 0000000..6e03336
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+from typing import List, Dict
+
+from granule_ingester.slicers.TileSlicer import TileSlicer
+
+logger = logging.getLogger(__name__)
+
+
+class SliceFileByStepSize(TileSlicer):
+ def __init__(self,
+ dimension_step_sizes: Dict[str, int],
+ *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._dimension_step_sizes = dimension_step_sizes
+
+ def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+ # make sure all provided dimensions are in dataset
+ for dim_name in self._dimension_step_sizes.keys():
+ if dim_name not in list(dimension_specs.keys()):
+ raise KeyError('Provided dimension "{}" not found in dataset'.format(dim_name))
+
+ slices = self._generate_chunk_boundary_slices(dimension_specs)
+ logger.info("Sliced granule into {} slices.".format(len(slices)))
+ return slices
+
+ def _generate_chunk_boundary_slices(self, dimension_specs) -> list:
+ dimension_bounds = []
+ dim_step_keys = self._dimension_step_sizes.keys()
+
+ for dim_name, dim_len in dimension_specs.items():
+ step_size = self._dimension_step_sizes[dim_name] if dim_name in dim_step_keys else dim_len
+
+ bounds = []
+ for i in range(0, dim_len, step_size):
+ bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+ start=i,
+ end=min((i + step_size), dim_len)))
+ dimension_bounds.append(bounds)
+ return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)]
diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py b/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py
new file mode 100644
index 0000000..a4d401a
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py
@@ -0,0 +1,68 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple
+#
+# # from granule_ingester.entities import Dimension
+#
+#
+# class SliceFileByTilesDesired:
+# def __init__(self,
+# tiles_desired: int,
+# desired_spatial_dimensions: List[str],
+# time_dimension: Tuple[str, int] = None,
+# *args, **kwargs):
+# super().__init__(*args, **kwargs)
+# self._tiles_desired = tiles_desired
+#
+# # check that desired_dimensions have no duplicates
+# self._desired_spatial_dimensions = desired_spatial_dimensions
+# self._time_dimension = time_dimension
+#
+# def generate_slices(self,
+# dimension_specs: Dict[str, int]) -> List[str]:
+# # check that dimension_specs contain all desired_dimensions
+# # check that there are no duplicate keys in dimension_specs
+# desired_dimension_specs = {key: dimension_specs[key]
+# for key in self._desired_spatial_dimensions}
+# spatial_slices = self._generate_spatial_slices(tiles_desired=self._tiles_desired,
+# dimension_specs=desired_dimension_specs)
+# if self._time_dimension:
+# temporal_slices = self._generate_temporal_slices(self._time_dimension)
+# return self._add_temporal_slices(temporal_slices, spatial_slices)
+# return spatial_slices
+#
+# def _add_temporal_slices(self, temporal_slices, spatial_slices) -> List[str]:
+# return ['{time},{space}'.format(time=temporal_slice, space=spatial_slice)
+# for spatial_slice in spatial_slices
+# for temporal_slice in temporal_slices]
+#
+# def _generate_temporal_slices(self, time_dimension: Tuple[str, int]):
+# return ['{time_dim}:{start}:{end}'.format(time_dim=time_dimension[0],
+# start=i,
+# end=i+1)
+# for i in range(time_dimension[1]-1)]
+#
+# def _generate_spatial_slices(self,
+# tiles_desired: int,
+# dimension_specs: Dict[str, int]) -> List[str]:
+# n_dimensions = len(dimension_specs)
+# dimension_bounds = []
+# for dim_name, dim_length in dimension_specs.items():
+# step_size = SliceFileByTilesDesired._calculate_step_size(
+# dim_length, tiles_desired, n_dimensions)
+# bounds = []
+# start_loc = 0
+# while start_loc < dim_length:
+# end_loc = start_loc + step_size if start_loc + \
+# step_size < dim_length else dim_length
+# bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+# start=start_loc,
+# end=end_loc))
+# start_loc += step_size
+# dimension_bounds.append(bounds)
+# return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)]
+#
+# @staticmethod
+# def _calculate_step_size(dim_length, chunks_desired, n_dimensions) -> int:
+# chunks_per_dim = math.pow(chunks_desired, 1.0 / n_dimensions)
+# return math.floor(dim_length / chunks_per_dim)
diff --git a/granule_ingester/granule_ingester/slicers/TileSlicer.py b/granule_ingester/granule_ingester/slicers/TileSlicer.py
new file mode 100644
index 0000000..06cf094
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/TileSlicer.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from typing import List
+
+import xarray as xr
+from nexusproto.DataTile_pb2 import NexusTile
+
+
+class TileSlicer(ABC):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self._granule_name = None
+ self._current_tile_spec_index = 0
+ self._tile_spec_list: List[str] = []
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> NexusTile:
+ if self._current_tile_spec_index == len(self._tile_spec_list):
+ raise StopIteration
+
+ current_tile_spec = self._tile_spec_list[self._current_tile_spec_index]
+ self._current_tile_spec_index += 1
+
+ tile = NexusTile()
+ tile.summary.section_spec = current_tile_spec
+ tile.summary.granule = self._granule_name
+ return tile
+
+ def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None):
+ self._granule_name = granule_name
+ dimensions = dataset.dims
+ self._tile_spec_list = self._generate_slices(dimensions)
+
+ return self
+
+ @abstractmethod
+ def _generate_slices(self, dimensions):
+ pass
diff --git a/granule_ingester/granule_ingester/slicers/__init__.py b/granule_ingester/granule_ingester/slicers/__init__.py
new file mode 100644
index 0000000..b13ea59
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/__init__.py
@@ -0,0 +1,2 @@
+from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize
+from granule_ingester.slicers.TileSlicer import TileSlicer
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
new file mode 100644
index 0000000..7a9f146
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import uuid
+
+from cassandra.cluster import Cluster, Session
+from cassandra.cqlengine import columns
+from cassandra.cqlengine.models import Model
+from nexusproto.DataTile_pb2 import NexusTile, TileData
+
+from granule_ingester.writers.DataStore import DataStore
+
+logging.getLogger('cassandra').setLevel(logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class TileModel(Model):
+ __keyspace__ = "nexustiles"
+ __table_name__ = "sea_surface_temp"
+ tile_id = columns.UUID(primary_key=True)
+ tile_blob = columns.Bytes(index=True)
+
+
+class CassandraStore(DataStore):
+ def __init__(self, contact_points=None, port=9042):
+ self._contact_points = contact_points
+ self._port = port
+ self._session = None
+
+ async def health_check(self) -> bool:
+ try:
+ session = self._get_session()
+ session.shutdown()
+ return True
+ except:
+ logger.error("Cannot connect to Cassandra!")
+ return False
+
+ def _get_session(self) -> Session:
+ cluster = Cluster(contact_points=self._contact_points, port=self._port)
+ session = cluster.connect()
+ session.set_keyspace('nexustiles')
+ return session
+
+ def connect(self):
+ self._session = self._get_session()
+
+ def __del__(self):
+ if self._session:
+ self._session.shutdown()
+
+ async def save_data(self, tile: NexusTile) -> None:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data = TileData.SerializeToString(tile.tile)
+ prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+ await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+
+ @staticmethod
+ async def _execute_query_async(session: Session, query, parameters=None):
+ cassandra_future = session.execute_async(query, parameters)
+ asyncio_future = asyncio.Future()
+ cassandra_future.add_callbacks(asyncio_future.set_result, asyncio_future.set_exception)
+ return await asyncio_future
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
new file mode 100644
index 0000000..889d41e
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -0,0 +1,13 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.healthcheck import HealthCheck
+
+
+class DataStore(HealthCheck, ABC):
+
+ @abstractmethod
+ def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
+ pass
+
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py
new file mode 100644
index 0000000..26311af
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -0,0 +1,11 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.healthcheck import HealthCheck
+
+
+class MetadataStore(HealthCheck, ABC):
+ @abstractmethod
+ def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
+ pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
new file mode 100644
index 0000000..9d6a7f0
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from asyncio import AbstractEventLoop
+
+import logging
+from datetime import datetime
+from pathlib import Path
+from typing import Dict
+
+import aiohttp
+from nexusproto.DataTile_pb2 import *
+from tenacity import *
+
+from granule_ingester.writers.MetadataStore import MetadataStore
+
+logger = logging.getLogger(__name__)
+
+
+class SolrStore(MetadataStore):
+ def __init__(self, host_and_port='http://localhost:8983'):
+ super().__init__()
+
+ self.TABLE_NAME = "sea_surface_temp"
+ self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
+
+ self._host_and_port = host_and_port
+ self.geo_precision: int = 3
+ self.collection: str = "nexustiles"
+ self.log: logging.Logger = logging.getLogger(__name__)
+ self.log.setLevel(logging.DEBUG)
+ self._session = None
+
+ def connect(self, loop: AbstractEventLoop = None):
+ self._session = aiohttp.ClientSession(loop=loop)
+
+ async def health_check(self):
+ try:
+ async with aiohttp.ClientSession() as session:
+ response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
+ if response.status == 200:
+ return True
+ else:
+ logger.error("Solr health check returned status {}.".format(response.status))
+ except aiohttp.ClientConnectionError as e:
+ logger.error("Cannot connect to Solr!")
+
+ return False
+
+ async def save_metadata(self, nexus_tile: NexusTile) -> None:
+ solr_doc = self._build_solr_doc(nexus_tile)
+
+ await self._save_document(self.collection, solr_doc)
+
+ @retry(stop=stop_after_attempt(5))
+ async def _save_document(self, collection: str, doc: dict):
+ url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
+ response = await self._session.post(url, json=doc)
+ if response.status < 200 or response.status >= 400:
+ raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+
+ def _build_solr_doc(self, tile: NexusTile) -> Dict:
+ summary: TileSummary = tile.summary
+ bbox: TileSummary.BBox = summary.bbox
+ stats: TileSummary.DataStats = summary.stats
+
+ min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso)
+ max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso)
+
+ geo = self.determine_geo(bbox)
+
+ granule_file_name: str = Path(summary.granule).name # get base filename
+
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ input_document = {
+ 'table_s': self.TABLE_NAME,
+ 'geo': geo,
+ 'id': summary.tile_id,
+ 'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id),
+ 'sectionSpec_s': summary.section_spec,
+ 'dataset_s': summary.dataset_name,
+ 'granule_s': granule_file_name,
+ 'tile_var_name_s': summary.data_var_name,
+ 'tile_min_lon': bbox.lon_min,
+ 'tile_max_lon': bbox.lon_max,
+ 'tile_min_lat': bbox.lat_min,
+ 'tile_max_lat': bbox.lat_max,
+ 'tile_depth': tile_data.depth,
+ 'tile_min_time_dt': min_time,
+ 'tile_max_time_dt': max_time,
+ 'tile_min_val_d': stats.min,
+ 'tile_max_val_d': stats.max,
+ 'tile_avg_val_d': stats.mean,
+ 'tile_count_i': int(stats.count)
+ }
+
+ ecco_tile_id = getattr(tile_data, 'tile', None)
+ if ecco_tile_id:
+ input_document['ecco_tile'] = ecco_tile_id
+
+ for attribute in summary.global_attributes:
+ input_document[attribute.getName()] = attribute.getValues(
+ 0) if attribute.getValuesCount() == 1 else attribute.getValuesList()
+
+ return input_document
+
+ @staticmethod
+ def _format_latlon_string(value):
+ rounded_value = round(value, 3)
+ return '{:.3f}'.format(rounded_value)
+
+ @classmethod
+ def determine_geo(cls, bbox: TileSummary.BBox) -> str:
+ # Solr cannot index a POLYGON where all corners are the same point or when there are only
+ # 2 distinct points (line). Solr is configured for a specific precision so we need to round
+ # to that precision before checking equality.
+ lat_min_str = cls._format_latlon_string(bbox.lat_min)
+ lat_max_str = cls._format_latlon_string(bbox.lat_max)
+ lon_min_str = cls._format_latlon_string(bbox.lon_min)
+ lon_max_str = cls._format_latlon_string(bbox.lon_max)
+
+ # If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON
+ if bbox.lat_min == bbox.lat_max and bbox.lon_min == bbox.lon_max:
+ geo = 'POINT({} {})'.format(lon_min_str, lat_min_str)
+ # If lat min = lat max but lon min != lon max, or lon min = lon max but lat min != lat max,
+ # then we essentially have a line.
+ elif bbox.lat_min == bbox.lat_max or bbox.lon_min == bbox.lon_max:
+ geo = 'LINESTRING({} {}, {} {})'.format(lon_min_str, lat_min_str, lon_max_str, lat_min_str)
+ # All other cases should use POLYGON
+ else:
+ geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} {}))'.format(lon_min_str, lat_min_str,
+ lon_max_str, lat_min_str,
+ lon_max_str, lat_max_str,
+ lon_min_str, lat_max_str,
+ lon_min_str, lat_min_str)
+
+ return geo
diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py
new file mode 100644
index 0000000..9323d8c
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/__init__.py
@@ -0,0 +1,4 @@
+from granule_ingester.writers.DataStore import DataStore
+from granule_ingester.writers.MetadataStore import MetadataStore
+from granule_ingester.writers.SolrStore import SolrStore
+from granule_ingester.writers.CassandraStore import CassandraStore
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
new file mode 100644
index 0000000..4d9d4cb
--- /dev/null
+++ b/granule_ingester/requirements.txt
@@ -0,0 +1,3 @@
+cassandra-driver==3.23.0
+aiomultiprocess
+aioboto3
diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py
new file mode 100644
index 0000000..2a5920e
--- /dev/null
+++ b/granule_ingester/setup.py
@@ -0,0 +1,34 @@
+from subprocess import check_call, CalledProcessError
+
+from setuptools import setup, find_packages
+
+with open('requirements.txt') as f:
+ pip_requirements = f.readlines()
+
+try:
+ check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+except (CalledProcessError, IOError) as e:
+ raise EnvironmentError("Error installing conda packages", e)
+
+__version__ = '1.0.0-SNAPSHOT'
+
+setup(
+ name='sdap_granule_ingester',
+ version=__version__,
+ url="https://github.com/apache/incubator-sdap-ingester",
+ author="dev@sdap.apache.org",
+ author_email="dev@sdap.apache.org",
+ description="Python modules that can be used for NEXUS ingest.",
+ install_requires=pip_requirements,
+ packages=find_packages(
+ exclude=["*.tests", "*.tests.*", "tests.*", "tests", "scripts"]),
+ test_suite="tests",
+ platforms='any',
+ python_requires='>=3.7',
+ classifiers=[
+ 'Development Status :: 1 - Pre-Alpha',
+ 'Intended Audience :: Developers',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python :: 3.7',
+ ]
+)
diff --git a/granule_ingester/tests/__init__.py b/granule_ingester/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/config_files/analysed_sst.yml b/granule_ingester/tests/config_files/analysed_sst.yml
new file mode 100644
index 0000000..9148f98
--- /dev/null
+++ b/granule_ingester/tests/config_files/analysed_sst.yml
@@ -0,0 +1,16 @@
+slicer:
+ name: sliceFileByStepSize
+ dimension_step_sizes:
+ time: 1
+ lon: 10
+ lat: 10
+processors:
+ - name: GridReadingProcessor
+ latitude: lat
+ longitude: lon
+ time: time
+ variable_to_read: analysed_sst
+ - name: emptyTileFilter
+ - name: tileSummary
+ dataset_name: AVHRR_sst
+ - name: generateTileId
diff --git a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
new file mode 100644
index 0000000..9af889d
--- /dev/null
+++ b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
@@ -0,0 +1,17 @@
+granule:
+ resource: ../foo/bar.nc
+slicer:
+ name: sliceFileByStepSize
+ dimension_step_sizes:
+ time: 1
+ lat: 33
+ lon: 26
+processors:
+ - name: EccoReadingProcessor
+ latitude: YC
+ longitude: XC
+ time: time
+ depth: Z
+ tile: tile
+ variable_to_read: THETA
+ - name: generateTileId
diff --git a/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc b/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc
new file mode 100644
index 0000000..4935c81
Binary files /dev/null and b/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc differ
diff --git a/granule_ingester/tests/granules/OBP_2017_01.nc b/granule_ingester/tests/granules/OBP_2017_01.nc
new file mode 100644
index 0000000..8c9b584
Binary files /dev/null and b/granule_ingester/tests/granules/OBP_2017_01.nc differ
diff --git a/granule_ingester/tests/granules/OBP_native_grid.nc b/granule_ingester/tests/granules/OBP_native_grid.nc
new file mode 100755
index 0000000..addb8a0
Binary files /dev/null and b/granule_ingester/tests/granules/OBP_native_grid.nc differ
diff --git a/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 b/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5
new file mode 100644
index 0000000..11815dd
Binary files /dev/null and b/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 differ
diff --git a/granule_ingester/tests/granules/THETA_199201.nc b/granule_ingester/tests/granules/THETA_199201.nc
new file mode 100644
index 0000000..ad92a61
Binary files /dev/null and b/granule_ingester/tests/granules/THETA_199201.nc differ
diff --git a/granule_ingester/tests/granules/empty_mur.nc4 b/granule_ingester/tests/granules/empty_mur.nc4
new file mode 100644
index 0000000..f65c808
Binary files /dev/null and b/granule_ingester/tests/granules/empty_mur.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_ascatb.nc4 b/granule_ingester/tests/granules/not_empty_ascatb.nc4
new file mode 100644
index 0000000..d8ef90b
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_ascatb.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_avhrr.nc4 b/granule_ingester/tests/granules/not_empty_avhrr.nc4
new file mode 100644
index 0000000..af24071
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_avhrr.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_ccmp.nc b/granule_ingester/tests/granules/not_empty_ccmp.nc
new file mode 100644
index 0000000..b7b491d
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_ccmp.nc differ
diff --git a/granule_ingester/tests/granules/not_empty_mur.nc4 b/granule_ingester/tests/granules/not_empty_mur.nc4
new file mode 100644
index 0000000..09d31fd
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_mur.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_smap.h5 b/granule_ingester/tests/granules/not_empty_smap.h5
new file mode 100644
index 0000000..956cbc5
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_smap.h5 differ
diff --git a/granule_ingester/tests/granules/not_empty_wswm.nc b/granule_ingester/tests/granules/not_empty_wswm.nc
new file mode 100644
index 0000000..ce0ebcc
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_wswm.nc differ
diff --git a/granule_ingester/tests/pipeline/__init__.py b/granule_ingester/tests/pipeline/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
new file mode 100644
index 0000000..c18bf8b
--- /dev/null
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -0,0 +1,104 @@
+import os
+import unittest
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.pipeline.Pipeline import Pipeline
+from granule_ingester.processors import GenerateTileId
+from granule_ingester.processors.reading_processors import EccoReadingProcessor
+from granule_ingester.slicers.SliceFileByStepSize import *
+from granule_ingester.writers import DataStore, MetadataStore
+
+
+class TestPipeline(unittest.TestCase):
+ class MockProcessorNoParams:
+ def __init__(self):
+ pass
+
+ class MockProcessorWithParams:
+ def __init__(self, test_param):
+ self.test_param = test_param
+
+ def test_parse_config(self):
+ class MockDataStore(DataStore):
+ def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
+ pass
+
+ class MockMetadataStore(MetadataStore):
+ def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
+ pass
+
+ relative_path = "../config_files/ingestion_config_testfile.yaml"
+ file_path = os.path.join(os.path.dirname(__file__), relative_path)
+ pipeline = Pipeline.from_file(config_path=str(file_path),
+ data_store_factory=MockDataStore,
+ metadata_store_factory=MockMetadataStore)
+
+ self.assertEqual(pipeline._data_store_factory, MockDataStore)
+ self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)
+ self.assertEqual(type(pipeline._slicer), SliceFileByStepSize)
+ self.assertEqual(type(pipeline._tile_processors[0]), EccoReadingProcessor)
+ self.assertEqual(type(pipeline._tile_processors[1]), GenerateTileId)
+
+ def test_parse_module(self):
+ module_mappings = {
+ "sliceFileByStepSize": SliceFileByStepSize
+ }
+
+ module_config = {
+ "name": "sliceFileByStepSize",
+ "dimension_step_sizes": {
+ "time": 1,
+ "lat": 10,
+ "lon": 10
+ }
+ }
+ module = Pipeline._parse_module(module_config, module_mappings)
+ self.assertEqual(SliceFileByStepSize, type(module))
+ self.assertEqual(module_config['dimension_step_sizes'], module._dimension_step_sizes)
+
+ def test_parse_module_with_no_parameters(self):
+ module_mappings = {"MockModule": TestPipeline.MockProcessorNoParams}
+ module_config = {"name": "MockModule"}
+ module = Pipeline._parse_module(module_config, module_mappings)
+ self.assertEqual(type(module), TestPipeline.MockProcessorNoParams)
+
+ def test_parse_module_with_too_many_parameters(self):
+ module_mappings = {"MockModule": TestPipeline.MockProcessorNoParams}
+ module_config = {
+ "name": "MockModule",
+ "bogus_param": True
+ }
+ self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+
+ def test_parse_module_with_missing_parameters(self):
+ module_mappings = {"MockModule": TestPipeline.MockProcessorWithParams}
+ module_config = {
+ "name": "MockModule"
+ }
+
+ self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+
+ def test_process_tile(self):
+ # class MockIdProcessor:
+ # def process(self, tile, *args, **kwargs):
+ # tile.summary.tile_id = "test_id"
+ # return tile
+ #
+ # class MockReadingProcessor:
+ # def process(self, tile, *args, **kwargs):
+ # dataset = kwargs['dataset']
+ # tile.tile.grid_tile.variable_data.CopyFrom(to_shaped_array(dataset['test_variable']))
+ # return tile
+ #
+ # test_dataset = xr.Dataset({"test_variable": [1, 2, 3]})
+ # input_tile = nexusproto.NexusTile.SerializeToString(NexusTile())
+ # processor_list = [MockIdProcessor(), MockReadingProcessor()]
+ #
+ # output_tile = _process_tile_in_worker(processor_list, test_dataset, input_tile)
+ # output_tile = nexusproto.NexusTile.FromString(output_tile)
+ # tile_data = from_shaped_array(output_tile.tile.grid_tile.variable_data)
+ #
+ # np.testing.assert_equal(tile_data, [1, 2, 3])
+ # self.assertEqual(output_tile.summary.tile_id, "test_id")
+ ...
diff --git a/granule_ingester/tests/processors/__init__.py b/granule_ingester/tests/processors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/processors/test_GenerateTileId.py b/granule_ingester/tests/processors/test_GenerateTileId.py
new file mode 100644
index 0000000..17f1677
--- /dev/null
+++ b/granule_ingester/tests/processors/test_GenerateTileId.py
@@ -0,0 +1,22 @@
+import unittest
+
+import uuid
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors import GenerateTileId
+
+
+class TestGenerateTileId(unittest.TestCase):
+
+ def test_process(self):
+ processor = GenerateTileId()
+
+ tile = nexusproto.NexusTile()
+ tile.summary.granule = 'test_dir/test_granule.nc'
+ tile.summary.data_var_name = 'test_variable'
+ tile.summary.section_spec = 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9'
+
+ expected_id = uuid.uuid3(uuid.NAMESPACE_DNS,
+ 'test_granule.nc' + 'test_variable' + 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9')
+
+ self.assertEqual(str(expected_id), processor.process(tile).summary.tile_id)
diff --git a/granule_ingester/tests/reading_processors/__init__.py b/granule_ingester/tests/reading_processors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
new file mode 100644
index 0000000..f2e9f29
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
@@ -0,0 +1,64 @@
+import unittest
+from os import path
+
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.reading_processors import EccoReadingProcessor
+
+
+class TestEccoReadingProcessor(unittest.TestCase):
+
+ def test_generate_tile(self):
+ reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+ latitude='YC',
+ longitude='XC',
+ time='time',
+ tile='tile')
+
+ granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+ tile_summary = nexusproto.TileSummary()
+ tile_summary.granule = granule_path
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.CopyFrom(tile_summary)
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'tile': slice(10, 11),
+ 'j': slice(0, 15),
+ 'i': slice(0, 7)
+ }
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(output_tile.summary.granule, granule_path)
+ self.assertEqual(output_tile.tile.ecco_tile.tile, 10)
+ self.assertEqual(output_tile.tile.ecco_tile.time, 695563200)
+ self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7])
+ self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7])
+ self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
+
+ def test_generate_tile_with_dims_out_of_order(self):
+ reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+ latitude='YC',
+ longitude='XC',
+ time='time',
+ tile='tile')
+ granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+ input_tile = nexusproto.NexusTile()
+
+ dimensions_to_slices = {
+ 'j': slice(0, 15),
+ 'tile': slice(10, 11),
+ 'i': slice(0, 7),
+ 'time': slice(0, 1)
+ }
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(output_tile.tile.ecco_tile.tile, 10)
+ self.assertEqual(output_tile.tile.ecco_tile.time, 695563200)
+ self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7])
+ self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7])
+ self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
diff --git a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
new file mode 100644
index 0000000..aec3ae8
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
@@ -0,0 +1,265 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.reading_processors import GridReadingProcessor
+
+
+class TestReadMurData(unittest.TestCase):
+
+ def test_read_empty_mur(self):
+ reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/empty_mur.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 10),
+ 'lon': slice(0, 5)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual(1451638800, output_tile.tile.grid_tile.time)
+ self.assertEqual([10, 5], output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(0, np.ma.count(masked_data))
+
+ def test_read_not_empty_mur(self):
+ reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/not_empty_mur.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 10),
+ 'lon': slice(0, 5)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual(1451638800, output_tile.tile.grid_tile.time)
+ self.assertEqual([10, 5], output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(50, np.ma.count(masked_data))
+
+
+class TestReadCcmpData(unittest.TestCase):
+
+ def test_read_not_empty_ccmp(self):
+ reading_processor = GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/not_empty_ccmp.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'latitude': slice(0, 38),
+ 'longitude': slice(0, 87)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual(1451606400, output_tile.tile.grid_tile.time)
+ self.assertEqual([38, 87], output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([38], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([87], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(3306, np.ma.count(masked_data))
+
+ # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_ccmp.nc')
+ #
+ # ccmp_reader = GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time', meta='vwnd')
+ #
+ # input_tile = nexusproto.NexusTile()
+ # tile_summary = nexusproto.TileSummary()
+ # tile_summary.granule = "file:%s" % test_file
+ # tile_summary.section_spec = "time:0:1,longitude:0:87,latitude:0:38"
+ # input_tile.summary.CopyFrom(tile_summary)
+ #
+ # results = list(ccmp_reader.process(input_tile))
+ #
+ # self.assertEqual(1, len(results))
+ #
+ # # with open('./ccmp_nonempty_nexustile.bin', 'w') as f:
+ # # f.write(results[0])
+ #
+ # for nexus_tile in results:
+ # self.assertTrue(nexus_tile.HasField('tile'))
+ # self.assertTrue(nexus_tile.tile.HasField('grid_tile'))
+ # self.assertEqual(1, len(nexus_tile.tile.grid_tile.meta_data))
+ #
+ # tile = nexus_tile.tile.grid_tile
+ # self.assertEqual(38, from_shaped_array(tile.latitude).size)
+ # self.assertEqual(87, from_shaped_array(tile.longitude).size)
+ # self.assertEqual((1, 38, 87), from_shaped_array(tile.variable_data).shape)
+ #
+ # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))
+ # self.assertEqual(3306, np.ma.count(tile1_data))
+ # self.assertAlmostEqual(-78.375,
+ # np.ma.min(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+ # places=3)
+ # self.assertAlmostEqual(-69.125,
+ # np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+ # places=3)
+ #
+ # self.assertEqual(1451606400, results[0].tile.grid_tile.time)
+
+
+class TestReadAvhrrData(unittest.TestCase):
+ def test_read_not_empty_avhrr(self):
+ reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/not_empty_avhrr.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 5),
+ 'lon': slice(0, 10)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual(1462060800, output_tile.tile.grid_tile.time)
+ self.assertEqual([5, 10], output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([10], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(50, np.ma.count(masked_data))
+ # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_avhrr.nc4')
+ #
+ # avhrr_reader = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+ #
+ # input_tile = nexusproto.NexusTile()
+ # tile_summary = nexusproto.TileSummary()
+ # tile_summary.granule = "file:%s" % test_file
+ # tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10"
+ # input_tile.summary.CopyFrom(tile_summary)
+ #
+ # results = list(avhrr_reader.process(input_tile))
+ #
+ # self.assertEqual(1, len(results))
+ #
+ # for nexus_tile in results:
+ # self.assertTrue(nexus_tile.HasField('tile'))
+ # self.assertTrue(nexus_tile.tile.HasField('grid_tile'))
+ #
+ # tile = nexus_tile.tile.grid_tile
+ # self.assertEqual(10, from_shaped_array(tile.latitude).size)
+ # self.assertEqual(10, from_shaped_array(tile.longitude).size)
+ # self.assertEqual((1, 10, 10), from_shaped_array(tile.variable_data).shape)
+ #
+ # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))
+ # self.assertEqual(100, np.ma.count(tile1_data))
+ # self.assertAlmostEqual(-39.875,
+ # np.ma.min(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+ # places=3)
+ # self.assertAlmostEqual(-37.625,
+ # np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+ # places=3)
+ #
+ # self.assertEqual(1462060800, results[0].tile.grid_tile.time)
+ # self.assertAlmostEqual(289.71,
+ # np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))[
+ # 0, 0, 0],
+ # places=3)
+
+
+class TestReadInterpEccoData(unittest.TestCase):
+ def setUp(self):
+ self.module = GridReadingProcessor('OBP', 'latitude', 'longitude', x_dim='i', y_dim='j',
+ time='time')
+
+ def test_read_indexed_ecco(self):
+ reading_processor = GridReadingProcessor(variable_to_read='OBP',
+ latitude='latitude',
+ longitude='longitude',
+ time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/OBP_2017_01.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'j': slice(0, 5),
+ 'i': slice(0, 10)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual(1484568000, output_tile.tile.grid_tile.time)
+ self.assertEqual([5, 10], output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([10], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(50, np.ma.count(masked_data))
+
+ # test_file = path.join(path.dirname(__file__), 'datafiles', 'OBP_2017_01.nc')
+ #
+ # input_tile = nexusproto.NexusTile()
+ # tile_summary = nexusproto.TileSummary()
+ # tile_summary.granule = "file:%s" % test_file
+ # tile_summary.section_spec = "time:0:1,j:0:10,i:0:10"
+ # input_tile.summary.CopyFrom(tile_summary)
+ #
+ # results = list(self.module.process(input_tile))
+ #
+ # self.assertEqual(1, len(results))
+ #
+ # for nexus_tile in results:
+ # self.assertTrue(nexus_tile.HasField('tile'))
+ # self.assertTrue(nexus_tile.tile.HasField('grid_tile'))
+ #
+ # tile = nexus_tile.tile.grid_tile
+ # self.assertEqual(10, len(from_shaped_array(tile.latitude)))
+ # self.assertEqual(10, len(from_shaped_array(tile.longitude)))
+ #
+ # the_data = np.ma.masked_invalid(from_shaped_array(tile.variable_data))
+ # self.assertEqual((1, 10, 10), the_data.shape)
+ # self.assertEqual(100, np.ma.count(the_data))
+ # self.assertEqual(1484568000, tile.time)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
new file mode 100644
index 0000000..55ac4fc
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.reading_processors import SwathReadingProcessor
+
+
+class TestReadAscatbData(unittest.TestCase):
+ def test_read_not_empty_ascatb(self):
+ reading_processor = SwathReadingProcessor(variable_to_read='wind_speed',
+ latitude='lat',
+ longitude='lon',
+ time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/not_empty_ascatb.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'NUMROWS': slice(0, 1),
+ 'NUMCELLS': slice(0, 82)
+ }
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual([1, 82], output_tile.tile.swath_tile.time.shape)
+ self.assertEqual([1, 82], output_tile.tile.swath_tile.variable_data.shape)
+ self.assertEqual([1, 82], output_tile.tile.swath_tile.latitude.shape)
+ self.assertEqual([1, 82], output_tile.tile.swath_tile.longitude.shape)
+
+
+class TestReadSmapData(unittest.TestCase):
+ def test_read_not_empty_smap(self):
+ reading_processor = SwathReadingProcessor(
+ variable_to_read='smap_sss',
+ latitude='lat',
+ longitude='lon',
+ time='row_time')
+ granule_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'phony_dim_0': slice(0, 38),
+ 'phony_dim_1': slice(0, 1)
+ }
+
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule)
+ self.assertEqual([1], output_tile.tile.swath_tile.time.shape)
+ self.assertEqual([38, 1], output_tile.tile.swath_tile.variable_data.shape)
+ self.assertEqual([38, 1], output_tile.tile.swath_tile.latitude.shape)
+ self.assertEqual([38, 1], output_tile.tile.swath_tile.longitude.shape)
diff --git a/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py b/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py
new file mode 100644
index 0000000..90ae8bb
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py
@@ -0,0 +1,29 @@
+import unittest
+from collections import OrderedDict
+from os import path
+
+import xarray as xr
+
+from granule_ingester.processors.reading_processors import TileReadingProcessor
+
+
+class TestEccoReadingProcessor(unittest.TestCase):
+
+ def test_slices_for_variable(self):
+ dimensions_to_slices = {
+ 'j': slice(0, 1),
+ 'tile': slice(0, 1),
+ 'i': slice(0, 1),
+ 'time': slice(0, 1)
+ }
+
+ expected = {
+ 'tile': slice(0, 1, None),
+ 'j': slice(0, 1, None),
+ 'i': slice(0, 1, None)
+ }
+
+ granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ slices = TileReadingProcessor._slices_for_variable(ds['XC'], dimensions_to_slices)
+ self.assertEqual(slices, expected)
diff --git a/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py b/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py
new file mode 100644
index 0000000..a936885
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.reading_processors import TimeSeriesReadingProcessor
+
+
+class TestReadWSWMData(unittest.TestCase):
+
+ def test_read_not_empty_wswm(self):
+ reading_processor = TimeSeriesReadingProcessor('Qout', 'lat', 'lon', time='time')
+ granule_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 5832),
+ 'rivid': slice(0, 1),
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+ self.assertEqual([5832], output_tile.tile.time_series_tile.time.shape)
+ self.assertEqual([5832, 1], output_tile.tile.time_series_tile.variable_data.shape)
+ self.assertEqual([1], output_tile.tile.time_series_tile.latitude.shape)
+ self.assertEqual([1], output_tile.tile.time_series_tile.longitude.shape)
+
+ # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_wswm.nc')
+ # wswm_reader = TimeSeriesReadingProcessor('Qout', 'lat', 'lon', 'time')
+ #
+ # input_tile = nexusproto.NexusTile()
+ # tile_summary = nexusproto.TileSummary()
+ # tile_summary.granule = "file:%s" % test_file
+ # tile_summary.section_spec = "time:0:5832,rivid:0:1"
+ # input_tile.summary.CopyFrom(tile_summary)
+ #
+ # results = list(wswm_reader.process(input_tile))
+ #
+ # self.assertEqual(1, len(results))
+ #
+ # for nexus_tile in results:
+ # self.assertTrue(nexus_tile.HasField('tile'))
+ # self.assertTrue(nexus_tile.tile.HasField('time_series_tile'))
+ #
+ # tile = nexus_tile.tile.time_series_tile
+ # self.assertEqual(1, from_shaped_array(tile.latitude).size)
+ # self.assertEqual(1, from_shaped_array(tile.longitude).size)
+ # self.assertEqual((5832, 1), from_shaped_array(tile.variable_data).shape)
+ #
+ # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.variable_data))
+ # self.assertEqual(5832, np.ma.count(tile1_data))
+ # self.assertAlmostEqual(45.837,
+ # np.ma.min(
+ # np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.latitude))),
+ # places=3)
+ # self.assertAlmostEqual(-122.789,
+ # np.ma.max(
+ # np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.longitude))),
+ # places=3)
+ #
+ # tile1_times = from_shaped_array(results[0].tile.time_series_tile.time)
+ # self.assertEqual(852098400, tile1_times[0])
+ # self.assertEqual(915073200, tile1_times[-1])
+ # self.assertAlmostEqual(1.473,
+ # np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.variable_data))[
+ # 0, 0],
+ # places=3)
diff --git a/granule_ingester/tests/slicers/__init__.py b/granule_ingester/tests/slicers/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/slicers/test_SliceFileByDimension.py b/granule_ingester/tests/slicers/test_SliceFileByDimension.py
new file mode 100644
index 0000000..e86442b
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_SliceFileByDimension.py
@@ -0,0 +1,122 @@
+# import unittest
+# from collections import Set
+#
+# from netCDF4 import Dataset
+# from granule_ingester.slicers.SliceFileByDimension import SliceFileByDimension
+#
+#
+# class TestSliceFileByTilesDesired(unittest.TestCase):
+#
+# def test_generate_slices(self):
+# netcdf_path = 'tests/granules/THETA_199201.nc'
+# dataset = Dataset(netcdf_path)
+# dimension_specs = {value.name: value.size for key,
+# value in dataset.dimensions.items()}
+#
+# slicer = SliceFileByDimension(slice_dimension='depth',
+# dimension_name_prefix=None)
+# slices = slicer.generate_slices(dimension_specs)
+# expected_slices = ['nv:0:2,time:0:1,longitude:0:720,depth:0:1,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:1:2,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:2:3,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:3:4,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:4:5,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:5:6,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:6:7,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:7:8,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:8:9,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:9:10,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:10:11,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:11:12,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:12:13,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:13:14,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:14:15,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:15:16,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:16:17,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:17:18,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:18:19,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:19:20,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:20:21,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:21:22,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:22:23,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:23:24,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:24:25,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:25:26,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:26:27,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:27:28,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:28:29,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:29:30,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:30:31,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:31:32,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:32:33,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:33:34,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:34:35,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:35:36,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:36:37,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:37:38,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:38:39,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:39:40,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:40:41,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:41:42,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:42:43,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:43:44,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:44:45,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:45:46,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:46:47,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:47:48,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:48:49,latitude:0:360',
+# 'nv:0:2,time:0:1,longitude:0:720,depth:49:50,latitude:0:360']
+#
+# self.assertEqual(slices, expected_slices)
+#
+# def test_generate_slices_indexed(self):
+# netcdf_path = 'tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5'
+# dataset = Dataset(netcdf_path)
+# dimension_specs = {value.name: value.size for key,
+# value in dataset.dimensions.items()}
+#
+# slicer = SliceFileByDimension(slice_dimension='2',
+# dimension_name_prefix='phony_dim_')
+# slices = slicer.generate_slices(dimension_specs)
+# expected_slices = [
+# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:0:1',
+# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:1:2',
+# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:2:3',
+# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:3:4'
+# ]
+#
+# self.assertEqual(slices, expected_slices)
+#
+# def test_indexed_dimension_slicing(self):
+# # for some reason, python automatically prefixes integer-indexed dimensions with "phony_dim_"
+# dimension_specs = {'phony_dim_0': 8, 'phony_dim_1': 8, 'phony_dim_2': 5}
+# slicer = SliceFileByDimension(slice_dimension='2',
+# dimension_name_prefix=None)
+# boundary_slices = slicer._indexed_dimension_slicing(dimension_specs)
+# expected_slices = [
+# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:0:1',
+# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:1:2',
+# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:2:3',
+# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:3:4',
+# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:4:5'
+# ]
+#
+# self.assertEqual(boundary_slices, expected_slices)
+#
+# def test_generate_tile_boundary_slices(self):
+# dimension_specs = {'lat': 8, 'lon': 8, 'depth': 5}
+# slicer = SliceFileByDimension(slice_dimension='depth',
+# dimension_name_prefix=None)
+# boundary_slices = slicer._generate_tile_boundary_slices(slicer._slice_by_dimension,dimension_specs)
+# expected_slices = [
+# 'lat:0:8,lon:0:8,depth:0:1',
+# 'lat:0:8,lon:0:8,depth:1:2',
+# 'lat:0:8,lon:0:8,depth:2:3',
+# 'lat:0:8,lon:0:8,depth:3:4',
+# 'lat:0:8,lon:0:8,depth:4:5'
+# ]
+#
+# self.assertEqual(boundary_slices, expected_slices)
+#
+# if __name__ == '__main__':
+# unittest.main()
diff --git a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
new file mode 100644
index 0000000..7a8dd51
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
@@ -0,0 +1,105 @@
+import unittest
+from os import path
+
+import xarray as xr
+
+from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize
+
+
+class TestSliceFileByStepSize(unittest.TestCase):
+
+ def test_generate_slices(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/THETA_199201.nc')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ dimension_steps = {'nv': 2, 'time': 1, 'latitude': 180, 'longitude': 180, 'depth': 2}
+ slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+ slices = slicer._generate_slices(dimension_specs=dataset.dims)
+ expected_slices = [
+ 'depth:0:2,latitude:0:180,longitude:0:180,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:0:180,longitude:180:360,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:0:180,longitude:360:540,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:180:360,longitude:0:180,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:180:360,longitude:180:360,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:180:360,longitude:360:540,nv:0:2,time:0:1',
+ 'depth:0:2,latitude:180:360,longitude:540:720,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:0:180,longitude:0:180,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:0:180,longitude:180:360,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:0:180,longitude:360:540,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:180:360,longitude:0:180,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:180:360,longitude:180:360,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:180:360,longitude:360:540,nv:0:2,time:0:1',
+ 'depth:2:4,latitude:180:360,longitude:540:720,nv:0:2,time:0:1'
+ ]
+
+ self.assertEqual(expected_slices, slices)
+
+ def test_generate_slices_indexed(self):
+ netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5')
+ with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+ dimension_steps = {'phony_dim_0': 76, 'phony_dim_1': 812, 'phony_dim_2': 1}
+ slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+ slices = slicer._generate_slices(dimension_specs=dataset.dims)
+ expected_slices = [
+ 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:0:1',
+ 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:1:2',
+ 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:2:3',
+ 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:3:4',
+ 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:0:1',
+ 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:1:2',
+ 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:2:3',
+ 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:3:4'
+ ]
+
+ self.assertEqual(slices, expected_slices)
+
+ def test_generate_chunk_boundary_slices(self):
+ dimension_specs = {'time': 5832, 'rivid': 43}
+ dimension_steps = {'time': 2916, 'rivid': 5}
+ slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+ boundary_slices = slicer._generate_chunk_boundary_slices(dimension_specs)
+ expected_slices = [
+ 'time:0:2916,rivid:0:5',
+ 'time:0:2916,rivid:5:10',
+ 'time:0:2916,rivid:10:15',
+ 'time:0:2916,rivid:15:20',
+ 'time:0:2916,rivid:20:25',
+ 'time:0:2916,rivid:25:30',
+ 'time:0:2916,rivid:30:35',
+ 'time:0:2916,rivid:35:40',
+ 'time:0:2916,rivid:40:43',
+ 'time:2916:5832,rivid:0:5',
+ 'time:2916:5832,rivid:5:10',
+ 'time:2916:5832,rivid:10:15',
+ 'time:2916:5832,rivid:15:20',
+ 'time:2916:5832,rivid:20:25',
+ 'time:2916:5832,rivid:25:30',
+ 'time:2916:5832,rivid:30:35',
+ 'time:2916:5832,rivid:35:40',
+ 'time:2916:5832,rivid:40:43',
+ ]
+
+ self.assertEqual(boundary_slices, expected_slices)
+
+ def test_generate_chunk_boundary_slices_indexed(self):
+ dimension_steps = {'phony_dim_0': 4, 'phony_dim_1': 4, 'phony_dim_2': 3}
+ dimension_specs = {'phony_dim_0': 8, 'phony_dim_1': 8, 'phony_dim_2': 5}
+ slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+ boundary_slices = slicer._generate_slices(dimension_specs)
+ expected_slices = [
+ 'phony_dim_0:0:4,phony_dim_1:0:4,phony_dim_2:0:3',
+ 'phony_dim_0:0:4,phony_dim_1:0:4,phony_dim_2:3:5',
+ 'phony_dim_0:0:4,phony_dim_1:4:8,phony_dim_2:0:3',
+ 'phony_dim_0:0:4,phony_dim_1:4:8,phony_dim_2:3:5',
+ 'phony_dim_0:4:8,phony_dim_1:0:4,phony_dim_2:0:3',
+ 'phony_dim_0:4:8,phony_dim_1:0:4,phony_dim_2:3:5',
+ 'phony_dim_0:4:8,phony_dim_1:4:8,phony_dim_2:0:3',
+ 'phony_dim_0:4:8,phony_dim_1:4:8,phony_dim_2:3:5',
+ ]
+
+ self.assertEqual(boundary_slices, expected_slices)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py b/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py
new file mode 100644
index 0000000..772c63e
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py
@@ -0,0 +1,88 @@
+# import unittest
+# from collections import Set
+#
+# from netCDF4 import Dataset
+# from granule_ingester.slicers.SliceFileByTilesDesired import SliceFileByTilesDesired
+#
+#
+# class TestSliceFileByTilesDesired(unittest.TestCase):
+#
+# def test_generate_slices(self):
+# netcdf_path = 'tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+# dataset = Dataset(netcdf_path)
+# dimension_specs = {value.name: value.size for key,
+# value in dataset.dimensions.items()}
+#
+# slicer = SliceFileByTilesDesired(tiles_desired=2,
+# desired_spatial_dimensions=['lat', 'lon'])
+# slices = slicer.generate_slices(dimension_specs)
+# expected_slices = ['lat:0:509,lon:0:1018',
+# 'lat:0:509,lon:1018:1440',
+# 'lat:509:720,lon:0:1018',
+# 'lat:509:720,lon:1018:1440']
+# self.assertEqual(slices, expected_slices)
+#
+# def test_generate_slices_with_time(self):
+# netcdf_path = 'tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+# dataset = Dataset(netcdf_path)
+# dimension_specs = {value.name: value.size for key,
+# value in dataset.dimensions.items()}
+#
+# slicer = SliceFileByTilesDesired(tiles_desired=2,
+# desired_spatial_dimensions=[
+# 'lat', 'lon'],
+# time_dimension=('time', 3))
+# slices = slicer.generate_slices(dimension_specs)
+# expected_slices = ['time:0:1,lat:0:509,lon:0:1018',
+# 'time:1:2,lat:0:509,lon:0:1018',
+#
+# 'time:0:1,lat:0:509,lon:1018:1440',
+# 'time:1:2,lat:0:509,lon:1018:1440',
+#
+# 'time:0:1,lat:509:720,lon:0:1018',
+# 'time:1:2,lat:509:720,lon:0:1018',
+#
+# 'time:0:1,lat:509:720,lon:1018:1440',
+# 'time:1:2,lat:509:720,lon:1018:1440']
+# self.assertEqual(slices, expected_slices)
+#
+# def test_calculate_step_size_perfect_split_2dim(self):
+# step_size = SliceFileByTilesDesired._calculate_step_size(1000, 100, 2)
+# self.assertAlmostEqual(step_size, 100.0)
+#
+# def test_calculate_step_size_perfect_split_3dim(self):
+# step_size = SliceFileByTilesDesired._calculate_step_size(1000, 100, 3)
+# self.assertAlmostEqual(step_size, 215.0)
+#
+# def test_generate_spatial_slices(self):
+# dimension_specs = {'lat': 8, 'lon': 8}
+# slicer = SliceFileByTilesDesired(tiles_desired=2,
+# desired_spatial_dimensions=dimension_specs)
+# boundary_slices = slicer._generate_spatial_slices(tiles_desired=4,
+# dimension_specs=dimension_specs)
+# expected_slices = [
+# 'lat:0:4,lon:0:4',
+# 'lat:0:4,lon:4:8',
+# 'lat:4:8,lon:0:4',
+# 'lat:4:8,lon:4:8'
+# ]
+# self.assertEqual(boundary_slices, expected_slices)
+#
+# def test_generate_temporal_slices(self):
+# slicer = SliceFileByTilesDesired(tiles_desired=2,
+# desired_spatial_dimensions=None)
+# time_slices = slicer._generate_temporal_slices(('time', 10))
+# expected_time_slices = ['time:0:1',
+# 'time:1:2',
+# 'time:2:3',
+# 'time:3:4',
+# 'time:4:5',
+# 'time:5:6',
+# 'time:6:7',
+# 'time:7:8',
+# 'time:8:9']
+# self.assertEqual(time_slices, expected_time_slices)
+#
+#
+# if __name__ == '__main__':
+# unittest.main()
diff --git a/granule_ingester/tests/slicers/test_TileSlicer.py b/granule_ingester/tests/slicers/test_TileSlicer.py
new file mode 100644
index 0000000..c3ad97f
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_TileSlicer.py
@@ -0,0 +1,68 @@
+import asyncio
+import os
+import unittest
+from granule_ingester.slicers.TileSlicer import TileSlicer
+import xarray as xr
+
+
+class TestTileSlicer(unittest.TestCase):
+ class ToyTileSlicer(TileSlicer):
+ def _generate_slices(self, dimensions):
+ return [
+ 'time:0:1,lat:0:4,lon:0:4',
+ 'time:1:2,lat:0:4,lon:0:4',
+ 'time:2:3,lat:0:4,lon:0:4',
+
+ 'time:0:1,lat:0:4,lon:4:8',
+ 'time:1:2,lat:0:4,lon:4:8',
+ 'time:2:3,lat:0:4,lon:4:8',
+
+ 'time:0:1,lat:4:8,lon:0:4',
+ 'time:1:2,lat:4:8,lon:0:4',
+ 'time:2:3,lat:4:8,lon:0:4',
+
+ 'time:0:1,lat:4:8,lon:4:8',
+ 'time:1:2,lat:4:8,lon:4:8',
+ 'time:2:3,lat:4:8,lon:4:8'
+ ]
+
+ def test_generate_tiles(self):
+ relative_path = '../granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+ file_path = os.path.join(os.path.dirname(__file__), relative_path)
+ with xr.open_dataset(file_path) as dataset:
+ slicer = TestTileSlicer.ToyTileSlicer().generate_tiles(dataset, file_path)
+
+ expected_slices = slicer._generate_slices(None)
+ self.assertEqual(file_path, slicer._granule_name)
+ self.assertEqual(expected_slices, slicer._tile_spec_list)
+
+ # def test_open_s3(self):
+ # s3_path = 's3://nexus-ingest/avhrr/198109-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+ # slicer = TestTileSlicer.ToyTileSlicer(resource=s3_path)
+ #
+ # expected_slices = slicer._generate_slices(None)
+ # asyncio.run(slicer.open())
+ # self.assertIsNotNone(slicer.dataset)
+ # self.assertEqual(expected_slices, slicer._tile_spec_list)
+
+ def test_next(self):
+ relative_path = '../granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+ file_path = os.path.join(os.path.dirname(__file__), relative_path)
+ with xr.open_dataset(file_path) as dataset:
+ slicer = TestTileSlicer.ToyTileSlicer().generate_tiles(dataset, file_path)
+ generated_tiles = list(slicer)
+
+ expected_slices = slicer._generate_slices(None)
+ self.assertListEqual(expected_slices, [tile.summary.section_spec for tile in generated_tiles])
+ for tile in generated_tiles:
+ self.assertEqual(file_path, tile.summary.granule)
+
+ # def test_download_s3_file(self):
+ # slicer = TestTileSlicer.ToyTileSlicer(resource=None)
+ #
+ # asyncio.run(slicer._download_s3_file(
+ # "s3://nexus-ingest/avhrr/198109-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc"))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/granule_ingester/tests/writers/__init__.py b/granule_ingester/tests/writers/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/writers/test_SolrStore.py b/granule_ingester/tests/writers/test_SolrStore.py
new file mode 100644
index 0000000..76b85ac
--- /dev/null
+++ b/granule_ingester/tests/writers/test_SolrStore.py
@@ -0,0 +1,54 @@
+import asyncio
+import unittest
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.writers import SolrStore
+
+
+class TestSolrStore(unittest.TestCase):
+
+ def test_build_solr_doc(self):
+ tile = nexusproto.NexusTile()
+ tile.summary.tile_id = 'test_id'
+ tile.summary.dataset_name = 'test_dataset'
+ tile.summary.dataset_uuid = 'test_dataset_id'
+ tile.summary.data_var_name = 'test_variable'
+ tile.summary.granule = 'test_granule_path'
+ tile.summary.section_spec = 'time:0:1,j:0:20,i:200:240'
+ tile.summary.bbox.lat_min = -180.1
+ tile.summary.bbox.lat_max = 180.2
+ tile.summary.bbox.lon_min = -90.5
+ tile.summary.bbox.lon_max = 90.0
+ tile.summary.stats.min = -10.0
+ tile.summary.stats.max = 25.5
+ tile.summary.stats.mean = 12.5
+ tile.summary.stats.count = 100
+ tile.summary.stats.min_time = 694224000
+ tile.summary.stats.max_time = 694310400
+
+ tile.tile.ecco_tile.depth = 10.5
+
+ metadata_store = SolrStore()
+ solr_doc = metadata_store._build_solr_doc(tile)
+
+ self.assertEqual('sea_surface_temp', solr_doc['table_s'])
+ self.assertEqual(
+ 'POLYGON((-90.500 -180.100, 90.000 -180.100, 90.000 180.200, -90.500 180.200, -90.500 -180.100))',
+ solr_doc['geo'])
+ self.assertEqual('test_id', solr_doc['id'])
+ self.assertEqual('test_dataset!test_id', solr_doc['solr_id_s'])
+ self.assertEqual('time:0:1,j:0:20,i:200:240', solr_doc['sectionSpec_s'])
+ self.assertEqual('test_granule_path', solr_doc['granule_s'])
+ self.assertEqual('test_variable', solr_doc['tile_var_name_s'])
+ self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon'])
+ self.assertAlmostEqual(90.0, solr_doc['tile_max_lon'])
+ self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'])
+ self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'])
+ self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt'])
+ self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt'])
+ self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d'])
+ self.assertAlmostEqual(25.5, solr_doc['tile_max_val_d'])
+ self.assertAlmostEqual(12.5, solr_doc['tile_avg_val_d'])
+ self.assertEqual(100, solr_doc['tile_count_i'])
+ self.assertAlmostEqual(10.5, solr_doc['tile_depth'])