You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/23 00:49:44 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-245: Move granule ingester code into this repo (#2)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 08093ee  SDAP-245: Move granule ingester code into this repo (#2)
08093ee is described below

commit 08093ee15346713decdc33afab23790f2aee8a76
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Jun 22 17:49:38 2020 -0700

    SDAP-245: Move granule ingester code into this repo (#2)
    
    Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
 .gitattributes                                     |   1 -
 granule_ingester/.gitignore                        |   9 +
 granule_ingester/README.md                         |  34 +++
 granule_ingester/conda-requirements.txt            |  10 +
 granule_ingester/docker/Dockerfile                 |  21 ++
 granule_ingester/docker/entrypoint.sh              |  10 +
 granule_ingester/docker/install_nexusproto.sh      |  20 ++
 granule_ingester/granule_ingester/__init__.py      |   0
 .../granule_ingester/consumer/Consumer.py          |  88 +++++++
 .../granule_ingester/consumer/__init__.py          |   1 +
 .../granule_loaders/GranuleLoader.py               |  71 ++++++
 .../granule_ingester/granule_loaders/__init__.py   |   1 +
 .../granule_ingester/healthcheck/HealthCheck.py    |  22 ++
 .../granule_ingester/healthcheck/__init__.py       |   1 +
 granule_ingester/granule_ingester/main.py          | 118 +++++++++
 .../granule_ingester/pipeline/Modules.py           |  15 ++
 .../granule_ingester/pipeline/Pipeline.py          | 158 ++++++++++++
 .../granule_ingester/pipeline/__init__.py          |   2 +
 .../granule_ingester/processors/EmptyTileFilter.py |  42 ++++
 .../granule_ingester/processors/GenerateTileId.py  |  32 +++
 .../granule_ingester/processors/TileProcessor.py   |  23 ++
 .../processors/TileSummarizingProcessor.py         |  98 ++++++++
 .../granule_ingester/processors/__init__.py        |   5 +
 .../granule_ingester/processors/kelvintocelsius.py |  31 +++
 .../reading_processors/EccoReadingProcessor.py     |  64 +++++
 .../reading_processors/GridReadingProcessor.py     |  53 +++++
 .../reading_processors/SwathReadingProcessor.py    |  47 ++++
 .../reading_processors/TileReadingProcessor.py     |  81 +++++++
 .../TimeSeriesReadingProcessor.py                  |  83 +++++++
 .../processors/reading_processors/__init__.py      |   5 +
 .../slicers/SliceFileByDimension.py                |  55 +++++
 .../slicers/SliceFileByStepSize.py                 |  55 +++++
 .../slicers/SliceFileByTilesDesired.py             |  68 ++++++
 .../granule_ingester/slicers/TileSlicer.py         |  56 +++++
 .../granule_ingester/slicers/__init__.py           |   2 +
 .../granule_ingester/writers/CassandraStore.py     |  78 ++++++
 .../granule_ingester/writers/DataStore.py          |  13 +
 .../granule_ingester/writers/MetadataStore.py      |  11 +
 .../granule_ingester/writers/SolrStore.py          | 152 ++++++++++++
 .../granule_ingester/writers/__init__.py           |   4 +
 granule_ingester/requirements.txt                  |   3 +
 granule_ingester/setup.py                          |  34 +++
 granule_ingester/tests/__init__.py                 |   0
 .../tests/config_files/analysed_sst.yml            |  16 ++
 .../config_files/ingestion_config_testfile.yaml    |  17 ++
 ...4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc | Bin 0 -> 1057327 bytes
 granule_ingester/tests/granules/OBP_2017_01.nc     | Bin 0 -> 2110135 bytes
 granule_ingester/tests/granules/OBP_native_grid.nc | Bin 0 -> 1285094 bytes
 .../SMAP_L2B_SSS_04892_20160101T005507_R13080.h5   | Bin 0 -> 18672352 bytes
 granule_ingester/tests/granules/THETA_199201.nc    | Bin 0 -> 4255957 bytes
 granule_ingester/tests/granules/empty_mur.nc4      | Bin 0 -> 60937 bytes
 .../tests/granules/not_empty_ascatb.nc4            | Bin 0 -> 78036 bytes
 .../tests/granules/not_empty_avhrr.nc4             | Bin 0 -> 49511 bytes
 granule_ingester/tests/granules/not_empty_ccmp.nc  | Bin 0 -> 206870 bytes
 granule_ingester/tests/granules/not_empty_mur.nc4  | Bin 0 -> 60907 bytes
 granule_ingester/tests/granules/not_empty_smap.h5  | Bin 0 -> 3000192 bytes
 granule_ingester/tests/granules/not_empty_wswm.nc  | Bin 0 -> 1041568 bytes
 granule_ingester/tests/pipeline/__init__.py        |   0
 granule_ingester/tests/pipeline/test_Pipeline.py   | 104 ++++++++
 granule_ingester/tests/processors/__init__.py      |   0
 .../tests/processors/test_GenerateTileId.py        |  22 ++
 .../tests/reading_processors/__init__.py           |   0
 .../test_EccoReadingProcessor.py                   |  64 +++++
 .../test_GridReadingProcessor.py                   | 265 +++++++++++++++++++++
 .../test_SwathReadingProcessor.py                  |  74 ++++++
 .../test_TileReadingProcessor.py                   |  29 +++
 .../test_TimeSeriesReadingProcessor.py             |  86 +++++++
 granule_ingester/tests/slicers/__init__.py         |   0
 .../tests/slicers/test_SliceFileByDimension.py     | 122 ++++++++++
 .../tests/slicers/test_SliceFileByStepSize.py      | 105 ++++++++
 .../tests/slicers/test_SliceFileByTilesDesired.py  |  88 +++++++
 granule_ingester/tests/slicers/test_TileSlicer.py  |  68 ++++++
 granule_ingester/tests/writers/__init__.py         |   0
 granule_ingester/tests/writers/test_SolrStore.py   |  54 +++++
 74 files changed, 2790 insertions(+), 1 deletion(-)

diff --git a/.gitattributes b/.gitattributes
deleted file mode 100644
index 9abd205..0000000
--- a/.gitattributes
+++ /dev/null
@@ -1 +0,0 @@
-*.nc filter=lfs diff=lfs merge=lfs -text
diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore
new file mode 100644
index 0000000..5408b74
--- /dev/null
+++ b/granule_ingester/.gitignore
@@ -0,0 +1,9 @@
+.vscode
+.idea
+*.egg-info
+*__pycache__
+*.pytest_cache
+*.code-workspace
+.DS_STORE
+build
+dist
\ No newline at end of file
diff --git a/granule_ingester/README.md b/granule_ingester/README.md
new file mode 100644
index 0000000..112f52d
--- /dev/null
+++ b/granule_ingester/README.md
@@ -0,0 +1,34 @@
+# SDAP Granule Ingester
+
+The SDAP Granule Ingester is a service that reads from a RabbitMQ queue for
+YAML-formated string messages produced by the Collection Manager (`/collection_manager` 
+in this repo). For each message consumed, this service will read a granule file from
+disk and ingest it into SDAP by processing the granule and writing the resulting
+data to Cassandra and Solr.
+
+
+## Prerequisites
+
+Python 3.7
+
+## Building the service
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+    $ python setup.py install
+    
+
+## Launching the service
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+    $ python granule_ingester/main.py -h
+    
+## Running the tests
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+    $ pip install pytest
+    $ pytest
+    
+## Building the Docker image
+From `incubator-sdap-ingester/granule_ingester`, run:
+
+    $ docker build . -f docker/Dockerfile -t nexusjpl/granule-ingester
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
new file mode 100644
index 0000000..b2af149
--- /dev/null
+++ b/granule_ingester/conda-requirements.txt
@@ -0,0 +1,10 @@
+numpy==1.15.4
+scipy
+netcdf4==1.5.3
+pytz==2019.3
+xarray
+pyyaml==5.3.1
+requests==2.23.0
+aiohttp==3.6.2
+aio-pika
+tenacity
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
new file mode 100644
index 0000000..4b25318
--- /dev/null
+++ b/granule_ingester/docker/Dockerfile
@@ -0,0 +1,21 @@
+FROM continuumio/miniconda3:4.8.2-alpine
+
+USER root
+
+ENV PATH="/opt/conda/bin:$PATH"
+
+RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git openjdk8
+
+COPY /granule_ingester /sdap/granule_ingester
+COPY /setup.py /sdap/setup.py
+COPY /requirements.txt /sdap/requirements.txt
+COPY /conda-requirements.txt /sdap/conda-requirements.txt
+COPY /docker/install_nexusproto.sh /install_nexusproto.sh
+COPY /docker/entrypoint.sh /entrypoint.sh
+
+RUN ./install_nexusproto.sh
+RUN cd /sdap && python setup.py install
+
+RUN apk del .build-deps
+
+ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
new file mode 100644
index 0000000..e6f7262
--- /dev/null
+++ b/granule_ingester/docker/entrypoint.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+
+python /sdap/granule_ingester/main.py \
+  $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
+  $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \
+  $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \
+  $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
+  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
+  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
diff --git a/granule_ingester/docker/install_nexusproto.sh b/granule_ingester/docker/install_nexusproto.sh
new file mode 100755
index 0000000..7ba2cee
--- /dev/null
+++ b/granule_ingester/docker/install_nexusproto.sh
@@ -0,0 +1,20 @@
+set -e
+
+APACHE_NEXUSPROTO="https://github.com/apache/incubator-sdap-nexusproto.git"
+MASTER="master"
+
+GIT_REPO=${1:-$APACHE_NEXUSPROTO}
+GIT_BRANCH=${2:-$MASTER}
+
+mkdir nexusproto
+cd nexusproto
+git init
+git pull ${GIT_REPO} ${GIT_BRANCH}
+
+./gradlew pythonInstall --info
+
+./gradlew install --info
+
+rm -rf /root/.gradle
+cd ..
+rm -rf nexusproto
diff --git a/granule_ingester/granule_ingester/__init__.py b/granule_ingester/granule_ingester/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
new file mode 100644
index 0000000..75d347a
--- /dev/null
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import aio_pika
+
+from granule_ingester.healthcheck import HealthCheck
+from granule_ingester.pipeline import Pipeline
+
+logger = logging.getLogger(__name__)
+
+
+class Consumer(HealthCheck):
+
+    def __init__(self,
+                 rabbitmq_host,
+                 rabbitmq_username,
+                 rabbitmq_password,
+                 rabbitmq_queue,
+                 data_store_factory,
+                 metadata_store_factory):
+        self._rabbitmq_queue = rabbitmq_queue
+        self._data_store_factory = data_store_factory
+        self._metadata_store_factory = metadata_store_factory
+
+        self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
+                                                                                password=rabbitmq_password,
+                                                                                host=rabbitmq_host)
+        self._connection = None
+
+    async def health_check(self) -> bool:
+        try:
+            connection = await self._get_connection()
+            await connection.close()
+            return True
+        except:
+            logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
+            return False
+
+    async def _get_connection(self):
+        return await aio_pika.connect_robust(self._connection_string)
+
+    async def __aenter__(self):
+        self._connection = await self._get_connection()
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        if self._connection:
+            await self._connection.close()
+
+    @staticmethod
+    async def _received_message(message: aio_pika.IncomingMessage,
+                                data_store_factory,
+                                metadata_store_factory):
+        logger.info("Received a job from the queue. Starting pipeline.")
+        try:
+            config_str = message.body.decode("utf-8")
+            logger.debug(config_str)
+            pipeline = Pipeline.from_string(config_str=config_str,
+                                            data_store_factory=data_store_factory,
+                                            metadata_store_factory=metadata_store_factory)
+            await pipeline.run()
+            message.ack()
+        except Exception as e:
+            message.reject(requeue=True)
+            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+
+    async def start_consuming(self):
+        channel = await self._connection.channel()
+        await channel.set_qos(prefetch_count=1)
+        queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
+
+        async with queue.iterator() as queue_iter:
+            async for message in queue_iter:
+                await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
diff --git a/granule_ingester/granule_ingester/consumer/__init__.py b/granule_ingester/granule_ingester/consumer/__init__.py
new file mode 100644
index 0000000..35d075b
--- /dev/null
+++ b/granule_ingester/granule_ingester/consumer/__init__.py
@@ -0,0 +1 @@
+from granule_ingester.consumer.Consumer import Consumer
diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
new file mode 100644
index 0000000..c28ffbb
--- /dev/null
+++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+from urllib import parse
+
+import aioboto3
+import xarray as xr
+
+logger = logging.getLogger(__name__)
+
+
+class GranuleLoader:
+
+    def __init__(self, resource: str, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        self._granule_temp_file = None
+        self._resource = resource
+
+    async def __aenter__(self):
+        return await self.open()
+
+    async def __aexit__(self, type, value, traceback):
+        if self._granule_temp_file:
+            self._granule_temp_file.close()
+
+    async def open(self) -> (xr.Dataset, str):
+        resource_url = parse.urlparse(self._resource)
+        if resource_url.scheme == 's3':
+            # We need to save a reference to the temporary granule file so we can delete it when the context manager
+            # closes. The file needs to be kept around until nothing is reading the dataset anymore.
+            self._granule_temp_file = await self._download_s3_file(self._resource)
+            file_path = self._granule_temp_file.name
+        elif resource_url.scheme == '':
+            file_path = self._resource
+        else:
+            raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme))
+
+        granule_name = os.path.basename(self._resource)
+        return xr.open_dataset(file_path, lock=False), granule_name
+
+    @staticmethod
+    async def _download_s3_file(url: str):
+        parsed_url = parse.urlparse(url)
+        logger.info(
+            "Downloading S3 file from bucket '{}' with key '{}'".format(parsed_url.hostname, parsed_url.path[1:]))
+        async with aioboto3.resource("s3") as s3:
+            obj = await s3.Object(bucket_name=parsed_url.hostname, key=parsed_url.path[1:])
+            response = await obj.get()
+            data = await response['Body'].read()
+            logger.info("Finished downloading S3 file.")
+
+        fp = tempfile.NamedTemporaryFile()
+        fp.write(data)
+        logger.info("Saved downloaded file to {}.".format(fp.name))
+        return fp
diff --git a/granule_ingester/granule_ingester/granule_loaders/__init__.py b/granule_ingester/granule_ingester/granule_loaders/__init__.py
new file mode 100644
index 0000000..5df1cb0
--- /dev/null
+++ b/granule_ingester/granule_ingester/granule_loaders/__init__.py
@@ -0,0 +1 @@
+from granule_ingester.granule_loaders.GranuleLoader import GranuleLoader
diff --git a/granule_ingester/granule_ingester/healthcheck/HealthCheck.py b/granule_ingester/granule_ingester/healthcheck/HealthCheck.py
new file mode 100644
index 0000000..390c573
--- /dev/null
+++ b/granule_ingester/granule_ingester/healthcheck/HealthCheck.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+class HealthCheck(ABC):
+    @abstractmethod
+    async def health_check(self) -> bool:
+        pass
diff --git a/granule_ingester/granule_ingester/healthcheck/__init__.py b/granule_ingester/granule_ingester/healthcheck/__init__.py
new file mode 100644
index 0000000..f343c01
--- /dev/null
+++ b/granule_ingester/granule_ingester/healthcheck/__init__.py
@@ -0,0 +1 @@
+from granule_ingester.healthcheck.HealthCheck import HealthCheck
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
new file mode 100644
index 0000000..29795f7
--- /dev/null
+++ b/granule_ingester/granule_ingester/main.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import asyncio
+import logging
+from functools import partial
+from typing import List
+
+from granule_ingester.consumer import Consumer
+from granule_ingester.healthcheck import HealthCheck
+from granule_ingester.writers import CassandraStore
+from granule_ingester.writers import SolrStore
+
+
+def cassandra_factory(contact_points, port):
+    store = CassandraStore(contact_points, port)
+    store.connect()
+    return store
+
+
+def solr_factory(solr_host_and_port):
+    store = SolrStore(solr_host_and_port)
+    store.connect()
+    return store
+
+
+async def run_health_checks(dependencies: List[HealthCheck]):
+    for dependency in dependencies:
+        if not await dependency.health_check():
+            return False
+    return True
+
+
+async def main():
+    parser = argparse.ArgumentParser(description='Process some integers.')
+    parser.add_argument('--rabbitmq_host',
+                        default='localhost',
+                        metavar='HOST',
+                        help='RabbitMQ hostname to connect to. (Default: "localhost")')
+    parser.add_argument('--rabbitmq_username',
+                        default='guest',
+                        metavar='USERNAME',
+                        help='RabbitMQ username. (Default: "guest")')
+    parser.add_argument('--rabbitmq_password',
+                        default='guest',
+                        metavar='PASSWORD',
+                        help='RabbitMQ password. (Default: "guest")')
+    parser.add_argument('--rabbitmq_queue',
+                        default="nexus",
+                        metavar="QUEUE",
+                        help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
+    parser.add_argument('--cassandra_contact_points',
+                        default=['localhost'],
+                        metavar="HOST",
+                        nargs='+',
+                        help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")')
+    parser.add_argument('--cassandra_port',
+                        default=9042,
+                        metavar="PORT",
+                        help='Cassandra port. (Default: 9042)')
+    parser.add_argument('--solr_host_and_port',
+                        default='http://localhost:8983',
+                        metavar='HOST:PORT',
+                        help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('-v',
+                        '--verbose',
+                        action='store_true',
+                        help='Print verbose logs.')
+
+    args = parser.parse_args()
+
+    logging_level = logging.DEBUG if args.verbose else logging.INFO
+    logging.basicConfig(level=logging_level)
+    loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
+    for logger in loggers:
+        logger.setLevel(logging_level)
+
+    logger = logging.getLogger(__name__)
+
+    config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)])
+    logger.info("Using configuration values:\n{}".format(config_values_str))
+
+    cassandra_contact_points = args.cassandra_contact_points
+    cassandra_port = args.cassandra_port
+    solr_host_and_port = args.solr_host_and_port
+
+    consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
+                        rabbitmq_username=args.rabbitmq_username,
+                        rabbitmq_password=args.rabbitmq_password,
+                        rabbitmq_queue=args.rabbitmq_queue,
+                        data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
+                        metadata_store_factory=partial(solr_factory, solr_host_and_port))
+    if await run_health_checks(
+            [CassandraStore(cassandra_contact_points, cassandra_port),
+             SolrStore(solr_host_and_port),
+             consumer]):
+        async with consumer:
+            logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+            await consumer.start_consuming()
+    else:
+        logger.error("Quitting because not all dependencies passed the health checks.")
+
+
+if __name__ == '__main__':
+    asyncio.run(main())
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py
new file mode 100644
index 0000000..2cf2245
--- /dev/null
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -0,0 +1,15 @@
+from granule_ingester.processors import *
+from granule_ingester.processors.reading_processors import *
+from granule_ingester.slicers import *
+from granule_ingester.granule_loaders import *
+
+modules = {
+    "granule": GranuleLoader,
+    "sliceFileByStepSize": SliceFileByStepSize,
+    "generateTileId": GenerateTileId,
+    "EccoReadingProcessor": EccoReadingProcessor,
+    "GridReadingProcessor": GridReadingProcessor,
+    "tileSummary": TileSummarizingProcessor,
+    "emptyTileFilter": EmptyTileFilter,
+    "kelvinToCelsius": KelvinToCelsius
+}
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
new file mode 100644
index 0000000..8f2dd6f
--- /dev/null
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import time
+from typing import List
+
+import aiomultiprocess
+import xarray as xr
+import yaml
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.granule_loaders import GranuleLoader
+from granule_ingester.pipeline.Modules import modules as processor_module_mappings
+from granule_ingester.processors.TileProcessor import TileProcessor
+from granule_ingester.slicers import TileSlicer
+from granule_ingester.writers import DataStore, MetadataStore
+
+logger = logging.getLogger(__name__)
+
+MAX_QUEUE_SIZE = 2 ** 15 - 1
+
+_worker_data_store: DataStore = None
+_worker_metadata_store: MetadataStore = None
+_worker_processor_list: List[TileProcessor] = None
+_worker_dataset = None
+
+
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+    global _worker_data_store
+    global _worker_metadata_store
+    global _worker_processor_list
+    global _worker_dataset
+
+    # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
+    # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
+    _worker_data_store = data_store_factory()
+    _worker_metadata_store = metadata_store_factory()
+    _worker_processor_list = processor_list
+    _worker_dataset = dataset
+
+
+async def _process_tile_in_worker(serialized_input_tile: str):
+    global _worker_data_store
+    global _worker_metadata_store
+    global _worker_processor_list
+    global _worker_dataset
+
+    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+    processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
+    if processed_tile:
+        await _worker_data_store.save_data(processed_tile)
+        await _worker_metadata_store.save_metadata(processed_tile)
+
+
+def _recurse(processor_list: List[TileProcessor],
+             dataset: xr.Dataset,
+             input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
+    if len(processor_list) == 0:
+        return input_tile
+    output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
+    return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None
+
+
+class Pipeline:
+    def __init__(self,
+                 granule_loader: GranuleLoader,
+                 slicer: TileSlicer,
+                 data_store_factory,
+                 metadata_store_factory,
+                 tile_processors: List[TileProcessor]):
+        self._granule_loader = granule_loader
+        self._tile_processors = tile_processors
+        self._slicer = slicer
+        self._data_store_factory = data_store_factory
+        self._metadata_store_factory = metadata_store_factory
+
+    @classmethod
+    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
+        config = yaml.load(config_str, yaml.FullLoader)
+        return cls._build_pipeline(config,
+                                   data_store_factory,
+                                   metadata_store_factory,
+                                   processor_module_mappings)
+
+    @classmethod
+    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
+        with open(config_path) as config_file:
+            config = yaml.load(config_file, yaml.FullLoader)
+            return cls._build_pipeline(config,
+                                       data_store_factory,
+                                       metadata_store_factory,
+                                       processor_module_mappings)
+
+    @classmethod
+    def _build_pipeline(cls,
+                        config: dict,
+                        data_store_factory,
+                        metadata_store_factory,
+                        module_mappings: dict):
+        granule_loader = GranuleLoader(**config['granule'])
+
+        slicer_config = config['slicer']
+        slicer = cls._parse_module(slicer_config, module_mappings)
+
+        tile_processors = []
+        for processor_config in config['processors']:
+            module = cls._parse_module(processor_config, module_mappings)
+            tile_processors.append(module)
+
+        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+
+    @classmethod
+    def _parse_module(cls, module_config: dict, module_mappings: dict):
+        module_name = module_config.pop('name')
+        try:
+            module_class = module_mappings[module_name]
+            logger.debug("Loaded processor {}.".format(module_class))
+            processor_module = module_class(**module_config)
+        except KeyError:
+            raise RuntimeError("'{}' is not a valid processor.".format(module_name))
+
+        return processor_module
+
+    async def run(self):
+        async with self._granule_loader as (dataset, granule_name):
+            start = time.perf_counter()
+            async with aiomultiprocess.Pool(initializer=_init_worker,
+                                            initargs=(self._tile_processors,
+                                                      dataset,
+                                                      self._data_store_factory,
+                                                      self._metadata_store_factory)) as pool:
+                serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
+                                    self._slicer.generate_tiles(dataset, granule_name)]
+                # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
+                # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
+                for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+                    await pool.map(_process_tile_in_worker, chunk)
+
+        end = time.perf_counter()
+        logger.info("Pipeline finished in {} seconds".format(end - start))
+
+    @staticmethod
+    def _chunk_list(items, chunk_size):
+        return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/pipeline/__init__.py b/granule_ingester/granule_ingester/pipeline/__init__.py
new file mode 100644
index 0000000..7346aa7
--- /dev/null
+++ b/granule_ingester/granule_ingester/pipeline/__init__.py
@@ -0,0 +1,2 @@
+from granule_ingester.pipeline.Pipeline import Pipeline
+from granule_ingester.pipeline.Modules import modules
diff --git a/granule_ingester/granule_ingester/processors/EmptyTileFilter.py b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
new file mode 100644
index 0000000..4f012f5
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+logger = logging.getLogger(__name__)
+
+
+def parse_input(nexus_tile_data):
+    return nexusproto.NexusTile.FromString(nexus_tile_data)
+
+
+class EmptyTileFilter(TileProcessor):
+    def process(self, tile, *args, **kwargs):
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+        data = from_shaped_array(tile_data.variable_data)
+
+        # Only supply data if there is actual values in the tile
+        if data.size - numpy.count_nonzero(numpy.isnan(data)) > 0:
+            return tile
+        else:
+            logger.warning("Discarding tile from {} because it is empty".format(tile.summary.granule))
+        return None
diff --git a/granule_ingester/granule_ingester/processors/GenerateTileId.py b/granule_ingester/granule_ingester/processors/GenerateTileId.py
new file mode 100644
index 0000000..2d965f7
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/GenerateTileId.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+
+from nexusproto import DataTile_pb2 as nexusproto
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class GenerateTileId(TileProcessor):
+
+    def process(self, tile: nexusproto.NexusTile, *args, **kwargs):
+        granule = os.path.basename(tile.summary.granule)
+        variable_name = tile.summary.data_var_name
+        spec = tile.summary.section_spec
+        generated_id = uuid.uuid3(uuid.NAMESPACE_DNS, granule + variable_name + spec)
+
+        tile.summary.tile_id = str(generated_id)
+        return tile
diff --git a/granule_ingester/granule_ingester/processors/TileProcessor.py b/granule_ingester/granule_ingester/processors/TileProcessor.py
new file mode 100644
index 0000000..d62c504
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/TileProcessor.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+# TODO: make this an informal interface, not an abstract class
+class TileProcessor(ABC):
+    @abstractmethod
+    def process(self, tile, *args, **kwargs):
+        pass
diff --git a/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
new file mode 100644
index 0000000..1fe5d7d
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class NoTimeException(Exception):
+    pass
+
+
+def find_time_min_max(tile_data):
+    if tile_data.time:
+        if isinstance(tile_data.time, nexusproto.ShapedArray):
+            time_data = from_shaped_array(tile_data.time)
+            return int(numpy.nanmin(time_data).item()), int(numpy.nanmax(time_data).item())
+        elif isinstance(tile_data.time, int):
+            return tile_data.time, tile_data.time
+
+    raise NoTimeException
+
+
+class TileSummarizingProcessor(TileProcessor):
+
+    def __init__(self, dataset_name: str, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._dataset_name = dataset_name
+
+    def process(self, tile, *args, **kwargs):
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+
+        latitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude))
+        longitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude))
+        data = from_shaped_array(tile_data.variable_data)
+
+        tile_summary = tile.summary if tile.HasField("summary") else nexusproto.TileSummary()
+
+        tile_summary.dataset_name = self._dataset_name
+        tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item()
+        tile_summary.bbox.lat_max = numpy.nanmax(latitudes).item()
+        tile_summary.bbox.lon_min = numpy.nanmin(longitudes).item()
+        tile_summary.bbox.lon_max = numpy.nanmax(longitudes).item()
+        tile_summary.stats.min = numpy.nanmin(data).item()
+        tile_summary.stats.max = numpy.nanmax(data).item()
+        tile_summary.stats.count = data.size - numpy.count_nonzero(numpy.isnan(data))
+
+        # In order to accurately calculate the average we need to weight the data based on the cosine of its latitude
+        # This is handled slightly differently for swath vs. grid data
+        if tile_type == 'swath_tile':
+            # For Swath tiles, len(data) == len(latitudes) == len(longitudes).
+            # So we can simply weight each element in the data array
+            tile_summary.stats.mean = type(self).calculate_mean_for_swath_tile(data, latitudes)
+        elif tile_type == 'grid_tile':
+            # Grid tiles need to repeat the weight for every longitude
+            # TODO This assumes data axis' are ordered as latitude x longitude
+            tile_summary.stats.mean = type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes)
+        else:
+            # Default to simple average with no weighting
+            tile_summary.stats.mean = numpy.nanmean(data).item()
+
+        try:
+            min_time, max_time = find_time_min_max(tile_data)
+            tile_summary.stats.min_time = min_time
+            tile_summary.stats.max_time = max_time
+        except NoTimeException:
+            pass
+
+        tile.summary.CopyFrom(tile_summary)
+        return tile
+
+    @staticmethod
+    def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes):
+        flattened_variable_data = numpy.ma.masked_invalid(variable_data).flatten()
+        repeated_latitudes = numpy.repeat(latitudes, len(longitudes))
+        weights = numpy.cos(numpy.radians(repeated_latitudes))
+        return numpy.ma.average(flattened_variable_data, weights=weights).item()
+
+    @staticmethod
+    def calculate_mean_for_swath_tile(variable_data, latitudes):
+        weights = numpy.cos(numpy.radians(latitudes))
+        return numpy.ma.average(numpy.ma.masked_invalid(variable_data),
+                                weights=weights).item()
diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py
new file mode 100644
index 0000000..592d8ea
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/__init__.py
@@ -0,0 +1,5 @@
+from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter
+from granule_ingester.processors.GenerateTileId import GenerateTileId
+from granule_ingester.processors.TileProcessor import TileProcessor
+from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor
+from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
diff --git a/granule_ingester/granule_ingester/processors/kelvintocelsius.py b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
new file mode 100644
index 0000000..e728418
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class KelvinToCelsius(TileProcessor):
+    def process(self, tile, *args, **kwargs):
+        the_tile_type = tile.tile.WhichOneof("tile_type")
+        the_tile_data = getattr(tile.tile, the_tile_type)
+
+        var_data = from_shaped_array(the_tile_data.variable_data) - 273.15
+
+        the_tile_data.variable_data.CopyFrom(to_shaped_array(var_data))
+
+        return tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
new file mode 100644
index 0000000..1876013
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
@@ -0,0 +1,64 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class EccoReadingProcessor(TileReadingProcessor):
+    def __init__(self,
+                 variable_to_read,
+                 latitude,
+                 longitude,
+                 tile,
+                 depth=None,
+                 time=None,
+                 **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+        self.depth = depth
+        self.time = time
+        self.tile = tile
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+        new_tile = nexusproto.EccoTile()
+
+        lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+        lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+        lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+        lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+        data_subset = ds[self.variable_to_read][
+            type(self)._slices_for_variable(ds[self.variable_to_read], dimensions_to_slices)]
+        data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+        new_tile.tile = ds[self.tile][dimensions_to_slices[self.tile].start].item()
+
+        if self.depth:
+            depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+                                                                                                dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        if self.time:
+            time_slice = dimensions_to_slices[self.time]
+            time_slice_len = time_slice.stop - time_slice.start
+            if time_slice_len > 1:
+                raise RuntimeError(
+                    "Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time,
+                                                                                               dim_len=time_slice_len))
+            new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+        input_tile.tile.ecco_tile.CopyFrom(new_tile)
+        return input_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
new file mode 100644
index 0000000..4354f9e
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -0,0 +1,53 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class GridReadingProcessor(TileReadingProcessor):
+    def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+        self.depth = depth
+        self.time = time
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+        new_tile = nexusproto.GridTile()
+
+        lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+        lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+        lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+        lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+                                                                                dimensions_to_slices)]
+        data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+        if self.depth:
+            depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+                                                                                                dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        if self.time:
+            time_slice = dimensions_to_slices[self.time]
+            time_slice_len = time_slice.stop - time_slice.start
+            if time_slice_len > 1:
+                raise RuntimeError(
+                    "Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time,
+                                                                                               dim_len=time_slice_len))
+            new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+        input_tile.tile.grid_tile.CopyFrom(new_tile)
+        return input_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
new file mode 100644
index 0000000..fec28ca
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -0,0 +1,47 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class SwathReadingProcessor(TileReadingProcessor):
+    def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+        self.depth = depth
+        self.time = time
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+        new_tile = nexusproto.SwathTile()
+
+        lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+        lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+        lat_subset = np.ma.filled(lat_subset, np.NaN)
+        lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+        time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)]
+        time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+                                                                                dimensions_to_slices)]
+        data_subset = np.ma.filled(data_subset, np.NaN)
+
+        if self.depth:
+            depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+                                                                                                dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+        new_tile.time.CopyFrom(to_shaped_array(time_subset))
+        input_tile.tile.swath_tile.CopyFrom(new_tile)
+        return input_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
new file mode 100644
index 0000000..14a44f5
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from abc import ABC, abstractmethod
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+
+class TileReadingProcessor(TileProcessor, ABC):
+
+    def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, **kwargs):
+        self.variable_to_read = variable_to_read
+        self.latitude = latitude
+        self.longitude = longitude
+
+        # Common optional properties
+        self.temp_dir = None
+        self.metadata = None
+        # self.temp_dir = self.environ['TEMP_DIR']
+        # self.metadata = self.environ['META']
+
+    def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
+        dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+
+        output_tile = nexusproto.NexusTile()
+        output_tile.CopyFrom(tile)
+        output_tile.summary.data_var_name = self.variable_to_read
+
+        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+
+    @abstractmethod
+    def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
+        pass
+
+    @classmethod
+    def _parse_input(cls, the_input_tile, temp_dir):
+        specs = the_input_tile.summary.section_spec
+        tile_specifications = cls._convert_spec_to_slices(specs)
+
+        file_path = the_input_tile.summary.granule
+        file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path
+
+        return tile_specifications, file_path
+
+    @staticmethod
+    def _slices_for_variable(variable: xr.DataArray, dimension_to_slice: Dict[str, slice]) -> Dict[str, slice]:
+        return {dim_name: dimension_to_slice[dim_name] for dim_name in variable.dims}
+
+    @staticmethod
+    def _convert_spec_to_slices(spec):
+        dim_to_slice = {}
+        for dimension in spec.split(','):
+            name, start, stop = dimension.split(':')
+            dim_to_slice[name] = slice(int(start), int(stop))
+
+        return dim_to_slice
+
+    @staticmethod
+    def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
+        if times.dtype == np.float32:
+            return times
+        epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
+        return ((times - epoch) / 1e9).astype(int)
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
new file mode 100644
index 0000000..2831c0c
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -0,0 +1,83 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+
+
+class TimeSeriesReadingProcessor(TileReadingProcessor):
+    def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+        self.depth = depth
+        self.time = time
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile):
+        new_tile = nexusproto.TimeSeriesTile()
+
+        lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)]
+        lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)]
+        lat_subset = np.ma.filled(lat_subset, np.NaN)
+        lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+                                                                                dimensions_to_slices)]
+        data_subset = np.ma.filled(data_subset, np.NaN)
+
+        if self.depth:
+            depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim,
+                                                                                                dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)]
+        time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+        new_tile.time.CopyFrom(to_shaped_array(time_subset))
+
+        input_tile.tile.time_series_tile.CopyFrom(new_tile)
+        return input_tile
+
+    # def read_data(self, tile_specifications, file_path, output_tile):
+    #     with xr.decode_cf(xr.open_dataset(file_path, decode_cf=False), decode_times=False) as ds:
+    #         for section_spec, dimtoslice in tile_specifications:
+    #             tile = nexusproto.TimeSeriesTile()
+    #
+    #             instance_dimension = next(
+    #                 iter([dim for dim in ds[self.variable_to_read].dims if dim != self.time]))
+    #
+    #             tile.latitude.CopyFrom(
+    #                 to_shaped_array(np.ma.filled(ds[self.latitude].data[dimtoslice[instance_dimension]], np.NaN)))
+    #
+    #             tile.longitude.CopyFrom(
+    #                 to_shaped_array(
+    #                     np.ma.filled(ds[self.longitude].data[dimtoslice[instance_dimension]], np.NaN)))
+    #
+    #             # Before we read the data we need to make sure the dimensions are in the proper order so we don't
+    #             # have any indexing issues
+    #             ordered_slices = slices_for_variable(ds, self.variable_to_read, dimtoslice)
+    #             # Read data using the ordered slices, replacing masked values with NaN
+    #             data_array = np.ma.filled(ds[self.variable_to_read].data[tuple(ordered_slices.values())], np.NaN)
+    #
+    #             tile.variable_data.CopyFrom(to_shaped_array(data_array))
+    #
+    #             if self.metadata is not None:
+    #                 tile.meta_data.add().CopyFrom(
+    #                     to_metadata(self.metadata, ds[self.metadata].data[tuple(ordered_slices.values())]))
+    #
+    #             tile.time.CopyFrom(
+    #                 to_shaped_array(np.ma.filled(ds[self.time].data[dimtoslice[self.time]], np.NaN)))
+    #
+    #             output_tile.tile.time_series_tile.CopyFrom(tile)
+    #
+    #             yield output_tile
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
new file mode 100644
index 0000000..2fecce9
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
@@ -0,0 +1,5 @@
+from granule_ingester.processors.reading_processors.EccoReadingProcessor import EccoReadingProcessor
+from granule_ingester.processors.reading_processors.GridReadingProcessor import GridReadingProcessor
+from granule_ingester.processors.reading_processors.SwathReadingProcessor import SwathReadingProcessor
+from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor
+from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor import TimeSeriesReadingProcessor
diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py b/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py
new file mode 100644
index 0000000..193cf69
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py
@@ -0,0 +1,55 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple,Set
+#
+# # from granule_ingester.entities import Dimension
+#
+#
+# class SliceFileByDimension:
+#     def __init__(self,
+#                  slice_dimension: str,    # slice by this dimension
+#                  dimension_name_prefix: str = None,
+#                  *args, **kwargs):
+#         super().__init__(*args, **kwargs)
+#         self._slice_by_dimension = slice_dimension
+#         self._dimension_name_prefix = dimension_name_prefix
+#
+#     def generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+#         """
+#         Generate list of slices in all dimensions as strings.
+#
+#         :param dimension_specs: A dict of dimension names to dimension lengths
+#         :return: A list of slices across all dimensions, as strings in the form of dim1:0:720,dim2:0:1,dim3:0:360
+#         """
+#         # check if sliceDimension is int or str
+#         is_integer: bool = False
+#         try:
+#             int(self._slice_by_dimension)
+#             is_integer = True
+#         except ValueError:
+#             pass
+#
+#         return self._indexed_dimension_slicing(dimension_specs) if is_integer else self._generate_tile_boundary_slices(self._slice_by_dimension,dimension_specs)
+#
+#     def _indexed_dimension_slicing(self, dimension_specs):
+#         # python netCDF4 library automatically prepends "phony_dim" if indexed by integer
+#         if self._dimension_name_prefix == None or self._dimension_name_prefix == "":
+#             self._dimension_name_prefix = "phony_dim_"
+#
+#         return self._generate_tile_boundary_slices((self._dimension_name_prefix+self._slice_by_dimension),dimension_specs)
+#
+#     def _generate_tile_boundary_slices(self, slice_by_dimension, dimension_specs):
+#         dimension_bounds = []
+#         for dim_name,dim_len in dimension_specs.items():
+#             step_size = 1 if dim_name==slice_by_dimension else dim_len
+#
+#             bounds = []
+#             for i in range(0,dim_len,step_size):
+#                 bounds.append(
+#                     '{name}:{start}:{end}'.format(name=dim_name,
+#                                     start=i,
+#                                     end=min((i + step_size),dim_len) )
+#                 )
+#             dimension_bounds.append(bounds)
+#         return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)]
+#
diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py
new file mode 100644
index 0000000..6e03336
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+from typing import List, Dict
+
+from granule_ingester.slicers.TileSlicer import TileSlicer
+
+logger = logging.getLogger(__name__)
+
+
+class SliceFileByStepSize(TileSlicer):
+    def __init__(self,
+                 dimension_step_sizes: Dict[str, int],
+                 *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._dimension_step_sizes = dimension_step_sizes
+
+    def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+        # make sure all provided dimensions are in dataset
+        for dim_name in self._dimension_step_sizes.keys():
+            if dim_name not in list(dimension_specs.keys()):
+                raise KeyError('Provided dimension "{}" not found in dataset'.format(dim_name))
+
+        slices = self._generate_chunk_boundary_slices(dimension_specs)
+        logger.info("Sliced granule into {} slices.".format(len(slices)))
+        return slices
+
+    def _generate_chunk_boundary_slices(self, dimension_specs) -> list:
+        dimension_bounds = []
+        dim_step_keys = self._dimension_step_sizes.keys()
+
+        for dim_name, dim_len in dimension_specs.items():
+            step_size = self._dimension_step_sizes[dim_name] if dim_name in dim_step_keys else dim_len
+
+            bounds = []
+            for i in range(0, dim_len, step_size):
+                bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+                                                            start=i,
+                                                            end=min((i + step_size), dim_len)))
+            dimension_bounds.append(bounds)
+        return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)]
diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py b/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py
new file mode 100644
index 0000000..a4d401a
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py
@@ -0,0 +1,68 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple
+#
+# # from granule_ingester.entities import Dimension
+#
+#
+# class SliceFileByTilesDesired:
+#     def __init__(self,
+#                  tiles_desired: int,
+#                  desired_spatial_dimensions: List[str],
+#                  time_dimension: Tuple[str, int] = None,
+#                  *args, **kwargs):
+#         super().__init__(*args, **kwargs)
+#         self._tiles_desired = tiles_desired
+#
+#         # check that desired_dimensions have no duplicates
+#         self._desired_spatial_dimensions = desired_spatial_dimensions
+#         self._time_dimension = time_dimension
+#
+#     def generate_slices(self,
+#                         dimension_specs: Dict[str, int]) -> List[str]:
+#         # check that dimension_specs contain all desired_dimensions
+#         # check that there are no duplicate keys in dimension_specs
+#         desired_dimension_specs = {key: dimension_specs[key]
+#                                    for key in self._desired_spatial_dimensions}
+#         spatial_slices = self._generate_spatial_slices(tiles_desired=self._tiles_desired,
+#                                                        dimension_specs=desired_dimension_specs)
+#         if self._time_dimension:
+#             temporal_slices = self._generate_temporal_slices(self._time_dimension)
+#             return self._add_temporal_slices(temporal_slices, spatial_slices)
+#         return spatial_slices
+#
+#     def _add_temporal_slices(self, temporal_slices, spatial_slices) -> List[str]:
+#         return ['{time},{space}'.format(time=temporal_slice, space=spatial_slice)
+#                 for spatial_slice in spatial_slices
+#                 for temporal_slice in temporal_slices]
+#
+#     def _generate_temporal_slices(self, time_dimension: Tuple[str, int]):
+#         return ['{time_dim}:{start}:{end}'.format(time_dim=time_dimension[0],
+#                                                   start=i,
+#                                                   end=i+1)
+#                 for i in range(time_dimension[1]-1)]
+#
+#     def _generate_spatial_slices(self,
+#                                  tiles_desired: int,
+#                                  dimension_specs: Dict[str, int]) -> List[str]:
+#         n_dimensions = len(dimension_specs)
+#         dimension_bounds = []
+#         for dim_name, dim_length in dimension_specs.items():
+#             step_size = SliceFileByTilesDesired._calculate_step_size(
+#                 dim_length, tiles_desired, n_dimensions)
+#             bounds = []
+#             start_loc = 0
+#             while start_loc < dim_length:
+#                 end_loc = start_loc + step_size if start_loc + \
+#                     step_size < dim_length else dim_length
+#                 bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+#                                                             start=start_loc,
+#                                                             end=end_loc))
+#                 start_loc += step_size
+#             dimension_bounds.append(bounds)
+#         return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)]
+#
+#     @staticmethod
+#     def _calculate_step_size(dim_length, chunks_desired, n_dimensions) -> int:
+#         chunks_per_dim = math.pow(chunks_desired, 1.0 / n_dimensions)
+#         return math.floor(dim_length / chunks_per_dim)
diff --git a/granule_ingester/granule_ingester/slicers/TileSlicer.py b/granule_ingester/granule_ingester/slicers/TileSlicer.py
new file mode 100644
index 0000000..06cf094
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/TileSlicer.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from typing import List
+
+import xarray as xr
+from nexusproto.DataTile_pb2 import NexusTile
+
+
+class TileSlicer(ABC):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        self._granule_name = None
+        self._current_tile_spec_index = 0
+        self._tile_spec_list: List[str] = []
+
+    def __iter__(self):
+        return self
+
+    def __next__(self) -> NexusTile:
+        if self._current_tile_spec_index == len(self._tile_spec_list):
+            raise StopIteration
+
+        current_tile_spec = self._tile_spec_list[self._current_tile_spec_index]
+        self._current_tile_spec_index += 1
+
+        tile = NexusTile()
+        tile.summary.section_spec = current_tile_spec
+        tile.summary.granule = self._granule_name
+        return tile
+
+    def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None):
+        self._granule_name = granule_name
+        dimensions = dataset.dims
+        self._tile_spec_list = self._generate_slices(dimensions)
+
+        return self
+
+    @abstractmethod
+    def _generate_slices(self, dimensions):
+        pass
diff --git a/granule_ingester/granule_ingester/slicers/__init__.py b/granule_ingester/granule_ingester/slicers/__init__.py
new file mode 100644
index 0000000..b13ea59
--- /dev/null
+++ b/granule_ingester/granule_ingester/slicers/__init__.py
@@ -0,0 +1,2 @@
+from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize
+from granule_ingester.slicers.TileSlicer import TileSlicer
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
new file mode 100644
index 0000000..7a9f146
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import uuid
+
+from cassandra.cluster import Cluster, Session
+from cassandra.cqlengine import columns
+from cassandra.cqlengine.models import Model
+from nexusproto.DataTile_pb2 import NexusTile, TileData
+
+from granule_ingester.writers.DataStore import DataStore
+
+logging.getLogger('cassandra').setLevel(logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class TileModel(Model):
+    __keyspace__ = "nexustiles"
+    __table_name__ = "sea_surface_temp"
+    tile_id = columns.UUID(primary_key=True)
+    tile_blob = columns.Bytes(index=True)
+
+
+class CassandraStore(DataStore):
+    def __init__(self, contact_points=None, port=9042):
+        self._contact_points = contact_points
+        self._port = port
+        self._session = None
+
+    async def health_check(self) -> bool:
+        try:
+            session = self._get_session()
+            session.shutdown()
+            return True
+        except:
+            logger.error("Cannot connect to Cassandra!")
+            return False
+
+    def _get_session(self) -> Session:
+        cluster = Cluster(contact_points=self._contact_points, port=self._port)
+        session = cluster.connect()
+        session.set_keyspace('nexustiles')
+        return session
+
+    def connect(self):
+        self._session = self._get_session()
+
+    def __del__(self):
+        if self._session:
+            self._session.shutdown()
+
+    async def save_data(self, tile: NexusTile) -> None:
+        tile_id = uuid.UUID(tile.summary.tile_id)
+        serialized_tile_data = TileData.SerializeToString(tile.tile)
+        prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+        await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+
+    @staticmethod
+    async def _execute_query_async(session: Session, query, parameters=None):
+        cassandra_future = session.execute_async(query, parameters)
+        asyncio_future = asyncio.Future()
+        cassandra_future.add_callbacks(asyncio_future.set_result, asyncio_future.set_exception)
+        return await asyncio_future
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
new file mode 100644
index 0000000..889d41e
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -0,0 +1,13 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.healthcheck import HealthCheck
+
+
+class DataStore(HealthCheck, ABC):
+
+    @abstractmethod
+    def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
+        pass
+
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py
new file mode 100644
index 0000000..26311af
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -0,0 +1,11 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.healthcheck import HealthCheck
+
+
+class MetadataStore(HealthCheck, ABC):
+    @abstractmethod
+    def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
+        pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
new file mode 100644
index 0000000..9d6a7f0
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from asyncio import AbstractEventLoop
+
+import logging
+from datetime import datetime
+from pathlib import Path
+from typing import Dict
+
+import aiohttp
+from nexusproto.DataTile_pb2 import *
+from tenacity import *
+
+from granule_ingester.writers.MetadataStore import MetadataStore
+
+logger = logging.getLogger(__name__)
+
+
+class SolrStore(MetadataStore):
+    def __init__(self, host_and_port='http://localhost:8983'):
+        super().__init__()
+
+        self.TABLE_NAME = "sea_surface_temp"
+        self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
+
+        self._host_and_port = host_and_port
+        self.geo_precision: int = 3
+        self.collection: str = "nexustiles"
+        self.log: logging.Logger = logging.getLogger(__name__)
+        self.log.setLevel(logging.DEBUG)
+        self._session = None
+
+    def connect(self, loop: AbstractEventLoop = None):
+        self._session = aiohttp.ClientSession(loop=loop)
+
+    async def health_check(self):
+        try:
+            async with aiohttp.ClientSession() as session:
+                response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
+                if response.status == 200:
+                    return True
+                else:
+                    logger.error("Solr health check returned status {}.".format(response.status))
+        except aiohttp.ClientConnectionError as e:
+            logger.error("Cannot connect to Solr!")
+
+        return False
+
+    async def save_metadata(self, nexus_tile: NexusTile) -> None:
+        solr_doc = self._build_solr_doc(nexus_tile)
+
+        await self._save_document(self.collection, solr_doc)
+
+    @retry(stop=stop_after_attempt(5))
+    async def _save_document(self, collection: str, doc: dict):
+        url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
+        response = await self._session.post(url, json=doc)
+        if response.status < 200 or response.status >= 400:
+            raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+
+    def _build_solr_doc(self, tile: NexusTile) -> Dict:
+        summary: TileSummary = tile.summary
+        bbox: TileSummary.BBox = summary.bbox
+        stats: TileSummary.DataStats = summary.stats
+
+        min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso)
+        max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso)
+
+        geo = self.determine_geo(bbox)
+
+        granule_file_name: str = Path(summary.granule).name  # get base filename
+
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+
+        input_document = {
+            'table_s': self.TABLE_NAME,
+            'geo': geo,
+            'id': summary.tile_id,
+            'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id),
+            'sectionSpec_s': summary.section_spec,
+            'dataset_s': summary.dataset_name,
+            'granule_s': granule_file_name,
+            'tile_var_name_s': summary.data_var_name,
+            'tile_min_lon': bbox.lon_min,
+            'tile_max_lon': bbox.lon_max,
+            'tile_min_lat': bbox.lat_min,
+            'tile_max_lat': bbox.lat_max,
+            'tile_depth': tile_data.depth,
+            'tile_min_time_dt': min_time,
+            'tile_max_time_dt': max_time,
+            'tile_min_val_d': stats.min,
+            'tile_max_val_d': stats.max,
+            'tile_avg_val_d': stats.mean,
+            'tile_count_i': int(stats.count)
+        }
+
+        ecco_tile_id = getattr(tile_data, 'tile', None)
+        if ecco_tile_id:
+            input_document['ecco_tile'] = ecco_tile_id
+
+        for attribute in summary.global_attributes:
+            input_document[attribute.getName()] = attribute.getValues(
+                0) if attribute.getValuesCount() == 1 else attribute.getValuesList()
+
+        return input_document
+
+    @staticmethod
+    def _format_latlon_string(value):
+        rounded_value = round(value, 3)
+        return '{:.3f}'.format(rounded_value)
+
+    @classmethod
+    def determine_geo(cls, bbox: TileSummary.BBox) -> str:
+        # Solr cannot index a POLYGON where all corners are the same point or when there are only
+        # 2 distinct points (line). Solr is configured for a specific precision so we need to round
+        # to that precision before checking equality.
+        lat_min_str = cls._format_latlon_string(bbox.lat_min)
+        lat_max_str = cls._format_latlon_string(bbox.lat_max)
+        lon_min_str = cls._format_latlon_string(bbox.lon_min)
+        lon_max_str = cls._format_latlon_string(bbox.lon_max)
+
+        # If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON
+        if bbox.lat_min == bbox.lat_max and bbox.lon_min == bbox.lon_max:
+            geo = 'POINT({} {})'.format(lon_min_str, lat_min_str)
+        # If lat min = lat max but lon min != lon max, or lon min = lon max but lat min != lat max,
+        # then we essentially have a line.
+        elif bbox.lat_min == bbox.lat_max or bbox.lon_min == bbox.lon_max:
+            geo = 'LINESTRING({} {}, {} {})'.format(lon_min_str, lat_min_str, lon_max_str, lat_min_str)
+        # All other cases should use POLYGON
+        else:
+            geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} {}))'.format(lon_min_str, lat_min_str,
+                                                                        lon_max_str, lat_min_str,
+                                                                        lon_max_str, lat_max_str,
+                                                                        lon_min_str, lat_max_str,
+                                                                        lon_min_str, lat_min_str)
+
+        return geo
diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py
new file mode 100644
index 0000000..9323d8c
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/__init__.py
@@ -0,0 +1,4 @@
+from granule_ingester.writers.DataStore import DataStore
+from granule_ingester.writers.MetadataStore import MetadataStore
+from granule_ingester.writers.SolrStore import SolrStore
+from granule_ingester.writers.CassandraStore import CassandraStore
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
new file mode 100644
index 0000000..4d9d4cb
--- /dev/null
+++ b/granule_ingester/requirements.txt
@@ -0,0 +1,3 @@
+cassandra-driver==3.23.0
+aiomultiprocess
+aioboto3
diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py
new file mode 100644
index 0000000..2a5920e
--- /dev/null
+++ b/granule_ingester/setup.py
@@ -0,0 +1,34 @@
+from subprocess import check_call, CalledProcessError
+
+from setuptools import setup, find_packages
+
+with open('requirements.txt') as f:
+    pip_requirements = f.readlines()
+
+try:
+    check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+except (CalledProcessError, IOError) as e:
+    raise EnvironmentError("Error installing conda packages", e)
+
+__version__ = '1.0.0-SNAPSHOT'
+
+setup(
+    name='sdap_granule_ingester',
+    version=__version__,
+    url="https://github.com/apache/incubator-sdap-ingester",
+    author="dev@sdap.apache.org",
+    author_email="dev@sdap.apache.org",
+    description="Python modules that can be used for NEXUS ingest.",
+    install_requires=pip_requirements,
+    packages=find_packages(
+        exclude=["*.tests", "*.tests.*", "tests.*", "tests", "scripts"]),
+    test_suite="tests",
+    platforms='any',
+    python_requires='>=3.7',
+    classifiers=[
+        'Development Status :: 1 - Pre-Alpha',
+        'Intended Audience :: Developers',
+        'Operating System :: OS Independent',
+        'Programming Language :: Python :: 3.7',
+    ]
+)
diff --git a/granule_ingester/tests/__init__.py b/granule_ingester/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/config_files/analysed_sst.yml b/granule_ingester/tests/config_files/analysed_sst.yml
new file mode 100644
index 0000000..9148f98
--- /dev/null
+++ b/granule_ingester/tests/config_files/analysed_sst.yml
@@ -0,0 +1,16 @@
+slicer:
+  name: sliceFileByStepSize
+  dimension_step_sizes:
+    time: 1
+    lon: 10
+    lat: 10
+processors:
+  - name: GridReadingProcessor
+    latitude: lat
+    longitude: lon
+    time: time
+    variable_to_read: analysed_sst
+  - name: emptyTileFilter
+  - name: tileSummary
+    dataset_name: AVHRR_sst
+  - name: generateTileId
diff --git a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
new file mode 100644
index 0000000..9af889d
--- /dev/null
+++ b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
@@ -0,0 +1,17 @@
+granule:
+  resource: ../foo/bar.nc
+slicer:
+  name: sliceFileByStepSize
+  dimension_step_sizes:
+    time: 1
+    lat: 33
+    lon: 26
+processors:
+  - name: EccoReadingProcessor
+    latitude: YC
+    longitude: XC
+    time: time
+    depth: Z
+    tile: tile
+    variable_to_read: THETA
+  - name: generateTileId
diff --git a/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc b/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc
new file mode 100644
index 0000000..4935c81
Binary files /dev/null and b/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc differ
diff --git a/granule_ingester/tests/granules/OBP_2017_01.nc b/granule_ingester/tests/granules/OBP_2017_01.nc
new file mode 100644
index 0000000..8c9b584
Binary files /dev/null and b/granule_ingester/tests/granules/OBP_2017_01.nc differ
diff --git a/granule_ingester/tests/granules/OBP_native_grid.nc b/granule_ingester/tests/granules/OBP_native_grid.nc
new file mode 100755
index 0000000..addb8a0
Binary files /dev/null and b/granule_ingester/tests/granules/OBP_native_grid.nc differ
diff --git a/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 b/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5
new file mode 100644
index 0000000..11815dd
Binary files /dev/null and b/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 differ
diff --git a/granule_ingester/tests/granules/THETA_199201.nc b/granule_ingester/tests/granules/THETA_199201.nc
new file mode 100644
index 0000000..ad92a61
Binary files /dev/null and b/granule_ingester/tests/granules/THETA_199201.nc differ
diff --git a/granule_ingester/tests/granules/empty_mur.nc4 b/granule_ingester/tests/granules/empty_mur.nc4
new file mode 100644
index 0000000..f65c808
Binary files /dev/null and b/granule_ingester/tests/granules/empty_mur.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_ascatb.nc4 b/granule_ingester/tests/granules/not_empty_ascatb.nc4
new file mode 100644
index 0000000..d8ef90b
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_ascatb.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_avhrr.nc4 b/granule_ingester/tests/granules/not_empty_avhrr.nc4
new file mode 100644
index 0000000..af24071
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_avhrr.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_ccmp.nc b/granule_ingester/tests/granules/not_empty_ccmp.nc
new file mode 100644
index 0000000..b7b491d
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_ccmp.nc differ
diff --git a/granule_ingester/tests/granules/not_empty_mur.nc4 b/granule_ingester/tests/granules/not_empty_mur.nc4
new file mode 100644
index 0000000..09d31fd
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_mur.nc4 differ
diff --git a/granule_ingester/tests/granules/not_empty_smap.h5 b/granule_ingester/tests/granules/not_empty_smap.h5
new file mode 100644
index 0000000..956cbc5
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_smap.h5 differ
diff --git a/granule_ingester/tests/granules/not_empty_wswm.nc b/granule_ingester/tests/granules/not_empty_wswm.nc
new file mode 100644
index 0000000..ce0ebcc
Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_wswm.nc differ
diff --git a/granule_ingester/tests/pipeline/__init__.py b/granule_ingester/tests/pipeline/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
new file mode 100644
index 0000000..c18bf8b
--- /dev/null
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -0,0 +1,104 @@
+import os
+import unittest
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.pipeline.Pipeline import Pipeline
+from granule_ingester.processors import GenerateTileId
+from granule_ingester.processors.reading_processors import EccoReadingProcessor
+from granule_ingester.slicers.SliceFileByStepSize import *
+from granule_ingester.writers import DataStore, MetadataStore
+
+
+class TestPipeline(unittest.TestCase):
+    class MockProcessorNoParams:
+        def __init__(self):
+            pass
+
+    class MockProcessorWithParams:
+        def __init__(self, test_param):
+            self.test_param = test_param
+
+    def test_parse_config(self):
+        class MockDataStore(DataStore):
+            def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
+                pass
+
+        class MockMetadataStore(MetadataStore):
+            def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
+                pass
+
+        relative_path = "../config_files/ingestion_config_testfile.yaml"
+        file_path = os.path.join(os.path.dirname(__file__), relative_path)
+        pipeline = Pipeline.from_file(config_path=str(file_path),
+                                      data_store_factory=MockDataStore,
+                                      metadata_store_factory=MockMetadataStore)
+
+        self.assertEqual(pipeline._data_store_factory, MockDataStore)
+        self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)
+        self.assertEqual(type(pipeline._slicer), SliceFileByStepSize)
+        self.assertEqual(type(pipeline._tile_processors[0]), EccoReadingProcessor)
+        self.assertEqual(type(pipeline._tile_processors[1]), GenerateTileId)
+
+    def test_parse_module(self):
+        module_mappings = {
+            "sliceFileByStepSize": SliceFileByStepSize
+        }
+
+        module_config = {
+            "name": "sliceFileByStepSize",
+            "dimension_step_sizes": {
+                "time": 1,
+                "lat": 10,
+                "lon": 10
+            }
+        }
+        module = Pipeline._parse_module(module_config, module_mappings)
+        self.assertEqual(SliceFileByStepSize, type(module))
+        self.assertEqual(module_config['dimension_step_sizes'], module._dimension_step_sizes)
+
+    def test_parse_module_with_no_parameters(self):
+        module_mappings = {"MockModule": TestPipeline.MockProcessorNoParams}
+        module_config = {"name": "MockModule"}
+        module = Pipeline._parse_module(module_config, module_mappings)
+        self.assertEqual(type(module), TestPipeline.MockProcessorNoParams)
+
+    def test_parse_module_with_too_many_parameters(self):
+        module_mappings = {"MockModule": TestPipeline.MockProcessorNoParams}
+        module_config = {
+            "name": "MockModule",
+            "bogus_param": True
+        }
+        self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+
+    def test_parse_module_with_missing_parameters(self):
+        module_mappings = {"MockModule": TestPipeline.MockProcessorWithParams}
+        module_config = {
+            "name": "MockModule"
+        }
+
+        self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+
+    def test_process_tile(self):
+        # class MockIdProcessor:
+        #     def process(self, tile, *args, **kwargs):
+        #         tile.summary.tile_id = "test_id"
+        #         return tile
+        #
+        # class MockReadingProcessor:
+        #     def process(self, tile, *args, **kwargs):
+        #         dataset = kwargs['dataset']
+        #         tile.tile.grid_tile.variable_data.CopyFrom(to_shaped_array(dataset['test_variable']))
+        #         return tile
+        #
+        # test_dataset = xr.Dataset({"test_variable": [1, 2, 3]})
+        # input_tile = nexusproto.NexusTile.SerializeToString(NexusTile())
+        # processor_list = [MockIdProcessor(), MockReadingProcessor()]
+        #
+        # output_tile = _process_tile_in_worker(processor_list, test_dataset, input_tile)
+        # output_tile = nexusproto.NexusTile.FromString(output_tile)
+        # tile_data = from_shaped_array(output_tile.tile.grid_tile.variable_data)
+        #
+        # np.testing.assert_equal(tile_data, [1, 2, 3])
+        # self.assertEqual(output_tile.summary.tile_id, "test_id")
+        ...
diff --git a/granule_ingester/tests/processors/__init__.py b/granule_ingester/tests/processors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/processors/test_GenerateTileId.py b/granule_ingester/tests/processors/test_GenerateTileId.py
new file mode 100644
index 0000000..17f1677
--- /dev/null
+++ b/granule_ingester/tests/processors/test_GenerateTileId.py
@@ -0,0 +1,22 @@
+import unittest
+
+import uuid
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors import GenerateTileId
+
+
+class TestGenerateTileId(unittest.TestCase):
+
+    def test_process(self):
+        processor = GenerateTileId()
+
+        tile = nexusproto.NexusTile()
+        tile.summary.granule = 'test_dir/test_granule.nc'
+        tile.summary.data_var_name = 'test_variable'
+        tile.summary.section_spec = 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9'
+
+        expected_id = uuid.uuid3(uuid.NAMESPACE_DNS,
+                                 'test_granule.nc' + 'test_variable' + 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9')
+
+        self.assertEqual(str(expected_id), processor.process(tile).summary.tile_id)
diff --git a/granule_ingester/tests/reading_processors/__init__.py b/granule_ingester/tests/reading_processors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
new file mode 100644
index 0000000..f2e9f29
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
@@ -0,0 +1,64 @@
+import unittest
+from os import path
+
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.reading_processors import EccoReadingProcessor
+
+
+class TestEccoReadingProcessor(unittest.TestCase):
+
+    def test_generate_tile(self):
+        reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+                                                 latitude='YC',
+                                                 longitude='XC',
+                                                 time='time',
+                                                 tile='tile')
+
+        granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+        tile_summary = nexusproto.TileSummary()
+        tile_summary.granule = granule_path
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.CopyFrom(tile_summary)
+
+        dimensions_to_slices = {
+            'time': slice(0, 1),
+            'tile': slice(10, 11),
+            'j': slice(0, 15),
+            'i': slice(0, 7)
+        }
+        with xr.open_dataset(granule_path, decode_cf=True) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(output_tile.summary.granule, granule_path)
+            self.assertEqual(output_tile.tile.ecco_tile.tile, 10)
+            self.assertEqual(output_tile.tile.ecco_tile.time, 695563200)
+            self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7])
+            self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7])
+            self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
+
+    def test_generate_tile_with_dims_out_of_order(self):
+        reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+                                                 latitude='YC',
+                                                 longitude='XC',
+                                                 time='time',
+                                                 tile='tile')
+        granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+        input_tile = nexusproto.NexusTile()
+
+        dimensions_to_slices = {
+            'j': slice(0, 15),
+            'tile': slice(10, 11),
+            'i': slice(0, 7),
+            'time': slice(0, 1)
+        }
+        with xr.open_dataset(granule_path, decode_cf=True) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(output_tile.tile.ecco_tile.tile, 10)
+            self.assertEqual(output_tile.tile.ecco_tile.time, 695563200)
+            self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7])
+            self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7])
+            self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
diff --git a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
new file mode 100644
index 0000000..aec3ae8
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
@@ -0,0 +1,265 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.reading_processors import GridReadingProcessor
+
+
+class TestReadMurData(unittest.TestCase):
+
+    def test_read_empty_mur(self):
+        reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/empty_mur.nc4')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'time': slice(0, 1),
+            'lat': slice(0, 10),
+            'lon': slice(0, 5)
+        }
+        with xr.open_dataset(granule_path) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual(1451638800, output_tile.tile.grid_tile.time)
+            self.assertEqual([10, 5], output_tile.tile.grid_tile.variable_data.shape)
+            self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape)
+            self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+
+            masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+            self.assertEqual(0, np.ma.count(masked_data))
+
+    def test_read_not_empty_mur(self):
+        reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/not_empty_mur.nc4')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'time': slice(0, 1),
+            'lat': slice(0, 10),
+            'lon': slice(0, 5)
+        }
+        with xr.open_dataset(granule_path) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual(1451638800, output_tile.tile.grid_tile.time)
+            self.assertEqual([10, 5], output_tile.tile.grid_tile.variable_data.shape)
+            self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape)
+            self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+
+            masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+            self.assertEqual(50, np.ma.count(masked_data))
+
+
+class TestReadCcmpData(unittest.TestCase):
+
+    def test_read_not_empty_ccmp(self):
+        reading_processor = GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/not_empty_ccmp.nc')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'time': slice(0, 1),
+            'latitude': slice(0, 38),
+            'longitude': slice(0, 87)
+        }
+        with xr.open_dataset(granule_path) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual(1451606400, output_tile.tile.grid_tile.time)
+            self.assertEqual([38, 87], output_tile.tile.grid_tile.variable_data.shape)
+            self.assertEqual([38], output_tile.tile.grid_tile.latitude.shape)
+            self.assertEqual([87], output_tile.tile.grid_tile.longitude.shape)
+
+            masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+            self.assertEqual(3306, np.ma.count(masked_data))
+
+        # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_ccmp.nc')
+        #
+        # ccmp_reader = GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time', meta='vwnd')
+        #
+        # input_tile = nexusproto.NexusTile()
+        # tile_summary = nexusproto.TileSummary()
+        # tile_summary.granule = "file:%s" % test_file
+        # tile_summary.section_spec = "time:0:1,longitude:0:87,latitude:0:38"
+        # input_tile.summary.CopyFrom(tile_summary)
+        #
+        # results = list(ccmp_reader.process(input_tile))
+        #
+        # self.assertEqual(1, len(results))
+        #
+        # # with open('./ccmp_nonempty_nexustile.bin', 'w') as f:
+        # #     f.write(results[0])
+        #
+        # for nexus_tile in results:
+        #     self.assertTrue(nexus_tile.HasField('tile'))
+        #     self.assertTrue(nexus_tile.tile.HasField('grid_tile'))
+        #     self.assertEqual(1, len(nexus_tile.tile.grid_tile.meta_data))
+        #
+        #     tile = nexus_tile.tile.grid_tile
+        #     self.assertEqual(38, from_shaped_array(tile.latitude).size)
+        #     self.assertEqual(87, from_shaped_array(tile.longitude).size)
+        #     self.assertEqual((1, 38, 87), from_shaped_array(tile.variable_data).shape)
+        #
+        # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))
+        # self.assertEqual(3306, np.ma.count(tile1_data))
+        # self.assertAlmostEqual(-78.375,
+        #                        np.ma.min(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+        #                        places=3)
+        # self.assertAlmostEqual(-69.125,
+        #                        np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+        #                        places=3)
+        #
+        # self.assertEqual(1451606400, results[0].tile.grid_tile.time)
+
+
+class TestReadAvhrrData(unittest.TestCase):
+    def test_read_not_empty_avhrr(self):
+        reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/not_empty_avhrr.nc4')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'time': slice(0, 1),
+            'lat': slice(0, 5),
+            'lon': slice(0, 10)
+        }
+        with xr.open_dataset(granule_path) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual(1462060800, output_tile.tile.grid_tile.time)
+            self.assertEqual([5, 10], output_tile.tile.grid_tile.variable_data.shape)
+            self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape)
+            self.assertEqual([10], output_tile.tile.grid_tile.longitude.shape)
+
+            masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+            self.assertEqual(50, np.ma.count(masked_data))
+        # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_avhrr.nc4')
+        #
+        # avhrr_reader = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time')
+        #
+        # input_tile = nexusproto.NexusTile()
+        # tile_summary = nexusproto.TileSummary()
+        # tile_summary.granule = "file:%s" % test_file
+        # tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10"
+        # input_tile.summary.CopyFrom(tile_summary)
+        #
+        # results = list(avhrr_reader.process(input_tile))
+        #
+        # self.assertEqual(1, len(results))
+        #
+        # for nexus_tile in results:
+        #     self.assertTrue(nexus_tile.HasField('tile'))
+        #     self.assertTrue(nexus_tile.tile.HasField('grid_tile'))
+        #
+        #     tile = nexus_tile.tile.grid_tile
+        #     self.assertEqual(10, from_shaped_array(tile.latitude).size)
+        #     self.assertEqual(10, from_shaped_array(tile.longitude).size)
+        #     self.assertEqual((1, 10, 10), from_shaped_array(tile.variable_data).shape)
+        #
+        # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))
+        # self.assertEqual(100, np.ma.count(tile1_data))
+        # self.assertAlmostEqual(-39.875,
+        #                        np.ma.min(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+        #                        places=3)
+        # self.assertAlmostEqual(-37.625,
+        #                        np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))),
+        #                        places=3)
+        #
+        # self.assertEqual(1462060800, results[0].tile.grid_tile.time)
+        # self.assertAlmostEqual(289.71,
+        #                        np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))[
+        #                            0, 0, 0],
+        #                        places=3)
+
+
+class TestReadInterpEccoData(unittest.TestCase):
+    def setUp(self):
+        self.module = GridReadingProcessor('OBP', 'latitude', 'longitude', x_dim='i', y_dim='j',
+                                           time='time')
+
+    def test_read_indexed_ecco(self):
+        reading_processor = GridReadingProcessor(variable_to_read='OBP',
+                                                 latitude='latitude',
+                                                 longitude='longitude',
+                                                 time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/OBP_2017_01.nc')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'time': slice(0, 1),
+            'j': slice(0, 5),
+            'i': slice(0, 10)
+        }
+        with xr.open_dataset(granule_path) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual(1484568000, output_tile.tile.grid_tile.time)
+            self.assertEqual([5, 10], output_tile.tile.grid_tile.variable_data.shape)
+            self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape)
+            self.assertEqual([10], output_tile.tile.grid_tile.longitude.shape)
+
+            masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+            self.assertEqual(50, np.ma.count(masked_data))
+
+        # test_file = path.join(path.dirname(__file__), 'datafiles', 'OBP_2017_01.nc')
+        #
+        # input_tile = nexusproto.NexusTile()
+        # tile_summary = nexusproto.TileSummary()
+        # tile_summary.granule = "file:%s" % test_file
+        # tile_summary.section_spec = "time:0:1,j:0:10,i:0:10"
+        # input_tile.summary.CopyFrom(tile_summary)
+        #
+        # results = list(self.module.process(input_tile))
+        #
+        # self.assertEqual(1, len(results))
+        #
+        # for nexus_tile in results:
+        #     self.assertTrue(nexus_tile.HasField('tile'))
+        #     self.assertTrue(nexus_tile.tile.HasField('grid_tile'))
+        #
+        #     tile = nexus_tile.tile.grid_tile
+        #     self.assertEqual(10, len(from_shaped_array(tile.latitude)))
+        #     self.assertEqual(10, len(from_shaped_array(tile.longitude)))
+        #
+        #     the_data = np.ma.masked_invalid(from_shaped_array(tile.variable_data))
+        #     self.assertEqual((1, 10, 10), the_data.shape)
+        #     self.assertEqual(100, np.ma.count(the_data))
+        #     self.assertEqual(1484568000, tile.time)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
new file mode 100644
index 0000000..55ac4fc
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.reading_processors import SwathReadingProcessor
+
+
+class TestReadAscatbData(unittest.TestCase):
+    def test_read_not_empty_ascatb(self):
+        reading_processor = SwathReadingProcessor(variable_to_read='wind_speed',
+                                                  latitude='lat',
+                                                  longitude='lon',
+                                                  time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/not_empty_ascatb.nc4')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'NUMROWS': slice(0, 1),
+            'NUMCELLS': slice(0, 82)
+        }
+        with xr.open_dataset(granule_path, decode_cf=True) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual([1, 82], output_tile.tile.swath_tile.time.shape)
+            self.assertEqual([1, 82], output_tile.tile.swath_tile.variable_data.shape)
+            self.assertEqual([1, 82], output_tile.tile.swath_tile.latitude.shape)
+            self.assertEqual([1, 82], output_tile.tile.swath_tile.longitude.shape)
+
+
+class TestReadSmapData(unittest.TestCase):
+    def test_read_not_empty_smap(self):
+        reading_processor = SwathReadingProcessor(
+            variable_to_read='smap_sss',
+            latitude='lat',
+            longitude='lon',
+            time='row_time')
+        granule_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'phony_dim_0': slice(0, 38),
+            'phony_dim_1': slice(0, 1)
+        }
+
+        with xr.open_dataset(granule_path, decode_cf=True) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule)
+            self.assertEqual([1], output_tile.tile.swath_tile.time.shape)
+            self.assertEqual([38, 1], output_tile.tile.swath_tile.variable_data.shape)
+            self.assertEqual([38, 1], output_tile.tile.swath_tile.latitude.shape)
+            self.assertEqual([38, 1], output_tile.tile.swath_tile.longitude.shape)
diff --git a/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py b/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py
new file mode 100644
index 0000000..90ae8bb
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py
@@ -0,0 +1,29 @@
+import unittest
+from collections import OrderedDict
+from os import path
+
+import xarray as xr
+
+from granule_ingester.processors.reading_processors import TileReadingProcessor
+
+
+class TestEccoReadingProcessor(unittest.TestCase):
+
+    def test_slices_for_variable(self):
+        dimensions_to_slices = {
+            'j': slice(0, 1),
+            'tile': slice(0, 1),
+            'i': slice(0, 1),
+            'time': slice(0, 1)
+        }
+
+        expected = {
+            'tile': slice(0, 1, None),
+            'j': slice(0, 1, None),
+            'i': slice(0, 1, None)
+        }
+
+        granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc')
+        with xr.open_dataset(granule_path, decode_cf=True) as ds:
+            slices = TileReadingProcessor._slices_for_variable(ds['XC'], dimensions_to_slices)
+            self.assertEqual(slices, expected)
diff --git a/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py b/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py
new file mode 100644
index 0000000..a936885
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.processors.reading_processors import TimeSeriesReadingProcessor
+
+
+class TestReadWSWMData(unittest.TestCase):
+
+    def test_read_not_empty_wswm(self):
+        reading_processor = TimeSeriesReadingProcessor('Qout', 'lat', 'lon', time='time')
+        granule_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc')
+
+        input_tile = nexusproto.NexusTile()
+        input_tile.summary.granule = granule_path
+
+        dimensions_to_slices = {
+            'time': slice(0, 5832),
+            'rivid': slice(0, 1),
+        }
+        with xr.open_dataset(granule_path) as ds:
+            output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
+
+            self.assertEqual(granule_path, output_tile.summary.granule, granule_path)
+            self.assertEqual([5832], output_tile.tile.time_series_tile.time.shape)
+            self.assertEqual([5832, 1], output_tile.tile.time_series_tile.variable_data.shape)
+            self.assertEqual([1], output_tile.tile.time_series_tile.latitude.shape)
+            self.assertEqual([1], output_tile.tile.time_series_tile.longitude.shape)
+
+        # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_wswm.nc')
+        # wswm_reader = TimeSeriesReadingProcessor('Qout', 'lat', 'lon', 'time')
+        #
+        # input_tile = nexusproto.NexusTile()
+        # tile_summary = nexusproto.TileSummary()
+        # tile_summary.granule = "file:%s" % test_file
+        # tile_summary.section_spec = "time:0:5832,rivid:0:1"
+        # input_tile.summary.CopyFrom(tile_summary)
+        #
+        # results = list(wswm_reader.process(input_tile))
+        #
+        # self.assertEqual(1, len(results))
+        #
+        # for nexus_tile in results:
+        #     self.assertTrue(nexus_tile.HasField('tile'))
+        #     self.assertTrue(nexus_tile.tile.HasField('time_series_tile'))
+        #
+        #     tile = nexus_tile.tile.time_series_tile
+        #     self.assertEqual(1, from_shaped_array(tile.latitude).size)
+        #     self.assertEqual(1, from_shaped_array(tile.longitude).size)
+        #     self.assertEqual((5832, 1), from_shaped_array(tile.variable_data).shape)
+        #
+        # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.variable_data))
+        # self.assertEqual(5832, np.ma.count(tile1_data))
+        # self.assertAlmostEqual(45.837,
+        #                        np.ma.min(
+        #                            np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.latitude))),
+        #                        places=3)
+        # self.assertAlmostEqual(-122.789,
+        #                        np.ma.max(
+        #                            np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.longitude))),
+        #                        places=3)
+        #
+        # tile1_times = from_shaped_array(results[0].tile.time_series_tile.time)
+        # self.assertEqual(852098400, tile1_times[0])
+        # self.assertEqual(915073200, tile1_times[-1])
+        # self.assertAlmostEqual(1.473,
+        #                        np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.variable_data))[
+        #                            0, 0],
+        #                        places=3)
diff --git a/granule_ingester/tests/slicers/__init__.py b/granule_ingester/tests/slicers/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/slicers/test_SliceFileByDimension.py b/granule_ingester/tests/slicers/test_SliceFileByDimension.py
new file mode 100644
index 0000000..e86442b
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_SliceFileByDimension.py
@@ -0,0 +1,122 @@
+# import unittest
+# from collections import Set
+#
+# from netCDF4 import Dataset
+# from granule_ingester.slicers.SliceFileByDimension import SliceFileByDimension
+#
+#
+# class TestSliceFileByTilesDesired(unittest.TestCase):
+#
+#     def test_generate_slices(self):
+#         netcdf_path = 'tests/granules/THETA_199201.nc'
+#         dataset = Dataset(netcdf_path)
+#         dimension_specs = {value.name: value.size for key,
+#                            value in dataset.dimensions.items()}
+#
+#         slicer = SliceFileByDimension(slice_dimension='depth',
+#                                       dimension_name_prefix=None)
+#         slices = slicer.generate_slices(dimension_specs)
+#         expected_slices = ['nv:0:2,time:0:1,longitude:0:720,depth:0:1,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:1:2,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:2:3,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:3:4,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:4:5,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:5:6,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:6:7,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:7:8,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:8:9,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:9:10,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:10:11,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:11:12,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:12:13,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:13:14,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:14:15,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:15:16,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:16:17,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:17:18,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:18:19,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:19:20,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:20:21,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:21:22,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:22:23,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:23:24,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:24:25,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:25:26,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:26:27,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:27:28,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:28:29,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:29:30,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:30:31,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:31:32,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:32:33,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:33:34,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:34:35,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:35:36,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:36:37,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:37:38,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:38:39,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:39:40,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:40:41,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:41:42,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:42:43,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:43:44,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:44:45,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:45:46,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:46:47,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:47:48,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:48:49,latitude:0:360',
+#                            'nv:0:2,time:0:1,longitude:0:720,depth:49:50,latitude:0:360']
+#
+#         self.assertEqual(slices, expected_slices)
+#
+#     def test_generate_slices_indexed(self):
+#         netcdf_path = 'tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5'
+#         dataset = Dataset(netcdf_path)
+#         dimension_specs = {value.name: value.size for key,
+#                                                       value in dataset.dimensions.items()}
+#
+#         slicer = SliceFileByDimension(slice_dimension='2',
+#                                       dimension_name_prefix='phony_dim_')
+#         slices = slicer.generate_slices(dimension_specs)
+#         expected_slices = [
+#             'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:0:1',
+#             'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:1:2',
+#             'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:2:3',
+#             'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:3:4'
+#         ]
+#
+#         self.assertEqual(slices, expected_slices)
+#
+#     def test_indexed_dimension_slicing(self):
+#         # for some reason, python automatically prefixes integer-indexed dimensions with "phony_dim_"
+#         dimension_specs = {'phony_dim_0': 8, 'phony_dim_1': 8, 'phony_dim_2': 5}
+#         slicer = SliceFileByDimension(slice_dimension='2',
+#                                       dimension_name_prefix=None)
+#         boundary_slices = slicer._indexed_dimension_slicing(dimension_specs)
+#         expected_slices = [
+#             'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:0:1',
+#             'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:1:2',
+#             'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:2:3',
+#             'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:3:4',
+#             'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:4:5'
+#         ]
+#
+#         self.assertEqual(boundary_slices, expected_slices)
+#
+#     def test_generate_tile_boundary_slices(self):
+#         dimension_specs = {'lat': 8, 'lon': 8, 'depth': 5}
+#         slicer = SliceFileByDimension(slice_dimension='depth',
+#                                       dimension_name_prefix=None)
+#         boundary_slices = slicer._generate_tile_boundary_slices(slicer._slice_by_dimension,dimension_specs)
+#         expected_slices = [
+#             'lat:0:8,lon:0:8,depth:0:1',
+#             'lat:0:8,lon:0:8,depth:1:2',
+#             'lat:0:8,lon:0:8,depth:2:3',
+#             'lat:0:8,lon:0:8,depth:3:4',
+#             'lat:0:8,lon:0:8,depth:4:5'
+#         ]
+#
+#         self.assertEqual(boundary_slices, expected_slices)
+#
+# if __name__ == '__main__':
+#     unittest.main()
diff --git a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
new file mode 100644
index 0000000..7a8dd51
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
@@ -0,0 +1,105 @@
+import unittest
+from os import path
+
+import xarray as xr
+
+from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize
+
+
+class TestSliceFileByStepSize(unittest.TestCase):
+
+    def test_generate_slices(self):
+        netcdf_path = path.join(path.dirname(__file__), '../granules/THETA_199201.nc')
+        with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+            dimension_steps = {'nv': 2, 'time': 1, 'latitude': 180, 'longitude': 180, 'depth': 2}
+            slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+            slices = slicer._generate_slices(dimension_specs=dataset.dims)
+            expected_slices = [
+                'depth:0:2,latitude:0:180,longitude:0:180,nv:0:2,time:0:1',
+                'depth:0:2,latitude:0:180,longitude:180:360,nv:0:2,time:0:1',
+                'depth:0:2,latitude:0:180,longitude:360:540,nv:0:2,time:0:1',
+                'depth:0:2,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',
+                'depth:0:2,latitude:180:360,longitude:0:180,nv:0:2,time:0:1',
+                'depth:0:2,latitude:180:360,longitude:180:360,nv:0:2,time:0:1',
+                'depth:0:2,latitude:180:360,longitude:360:540,nv:0:2,time:0:1',
+                'depth:0:2,latitude:180:360,longitude:540:720,nv:0:2,time:0:1',
+                'depth:2:4,latitude:0:180,longitude:0:180,nv:0:2,time:0:1',
+                'depth:2:4,latitude:0:180,longitude:180:360,nv:0:2,time:0:1',
+                'depth:2:4,latitude:0:180,longitude:360:540,nv:0:2,time:0:1',
+                'depth:2:4,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',
+                'depth:2:4,latitude:180:360,longitude:0:180,nv:0:2,time:0:1',
+                'depth:2:4,latitude:180:360,longitude:180:360,nv:0:2,time:0:1',
+                'depth:2:4,latitude:180:360,longitude:360:540,nv:0:2,time:0:1',
+                'depth:2:4,latitude:180:360,longitude:540:720,nv:0:2,time:0:1'
+            ]
+
+            self.assertEqual(expected_slices, slices)
+
+    def test_generate_slices_indexed(self):
+        netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5')
+        with xr.open_dataset(netcdf_path, decode_cf=True) as dataset:
+            dimension_steps = {'phony_dim_0': 76, 'phony_dim_1': 812, 'phony_dim_2': 1}
+            slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+            slices = slicer._generate_slices(dimension_specs=dataset.dims)
+            expected_slices = [
+                'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:0:1',
+                'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:1:2',
+                'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:2:3',
+                'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:3:4',
+                'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:0:1',
+                'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:1:2',
+                'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:2:3',
+                'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:3:4'
+            ]
+
+            self.assertEqual(slices, expected_slices)
+
+    def test_generate_chunk_boundary_slices(self):
+        dimension_specs = {'time': 5832, 'rivid': 43}
+        dimension_steps = {'time': 2916, 'rivid': 5}
+        slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+        boundary_slices = slicer._generate_chunk_boundary_slices(dimension_specs)
+        expected_slices = [
+            'time:0:2916,rivid:0:5',
+            'time:0:2916,rivid:5:10',
+            'time:0:2916,rivid:10:15',
+            'time:0:2916,rivid:15:20',
+            'time:0:2916,rivid:20:25',
+            'time:0:2916,rivid:25:30',
+            'time:0:2916,rivid:30:35',
+            'time:0:2916,rivid:35:40',
+            'time:0:2916,rivid:40:43',
+            'time:2916:5832,rivid:0:5',
+            'time:2916:5832,rivid:5:10',
+            'time:2916:5832,rivid:10:15',
+            'time:2916:5832,rivid:15:20',
+            'time:2916:5832,rivid:20:25',
+            'time:2916:5832,rivid:25:30',
+            'time:2916:5832,rivid:30:35',
+            'time:2916:5832,rivid:35:40',
+            'time:2916:5832,rivid:40:43',
+        ]
+
+        self.assertEqual(boundary_slices, expected_slices)
+
+    def test_generate_chunk_boundary_slices_indexed(self):
+        dimension_steps = {'phony_dim_0': 4, 'phony_dim_1': 4, 'phony_dim_2': 3}
+        dimension_specs = {'phony_dim_0': 8, 'phony_dim_1': 8, 'phony_dim_2': 5}
+        slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
+        boundary_slices = slicer._generate_slices(dimension_specs)
+        expected_slices = [
+            'phony_dim_0:0:4,phony_dim_1:0:4,phony_dim_2:0:3',
+            'phony_dim_0:0:4,phony_dim_1:0:4,phony_dim_2:3:5',
+            'phony_dim_0:0:4,phony_dim_1:4:8,phony_dim_2:0:3',
+            'phony_dim_0:0:4,phony_dim_1:4:8,phony_dim_2:3:5',
+            'phony_dim_0:4:8,phony_dim_1:0:4,phony_dim_2:0:3',
+            'phony_dim_0:4:8,phony_dim_1:0:4,phony_dim_2:3:5',
+            'phony_dim_0:4:8,phony_dim_1:4:8,phony_dim_2:0:3',
+            'phony_dim_0:4:8,phony_dim_1:4:8,phony_dim_2:3:5',
+        ]
+
+        self.assertEqual(boundary_slices, expected_slices)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py b/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py
new file mode 100644
index 0000000..772c63e
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py
@@ -0,0 +1,88 @@
+# import unittest
+# from collections import Set
+#
+# from netCDF4 import Dataset
+# from granule_ingester.slicers.SliceFileByTilesDesired import SliceFileByTilesDesired
+#
+#
+# class TestSliceFileByTilesDesired(unittest.TestCase):
+#
+#     def test_generate_slices(self):
+#         netcdf_path = 'tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+#         dataset = Dataset(netcdf_path)
+#         dimension_specs = {value.name: value.size for key,
+#                            value in dataset.dimensions.items()}
+#
+#         slicer = SliceFileByTilesDesired(tiles_desired=2,
+#                                          desired_spatial_dimensions=['lat', 'lon'])
+#         slices = slicer.generate_slices(dimension_specs)
+#         expected_slices = ['lat:0:509,lon:0:1018',
+#                            'lat:0:509,lon:1018:1440',
+#                            'lat:509:720,lon:0:1018',
+#                            'lat:509:720,lon:1018:1440']
+#         self.assertEqual(slices, expected_slices)
+#
+#     def test_generate_slices_with_time(self):
+#         netcdf_path = 'tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+#         dataset = Dataset(netcdf_path)
+#         dimension_specs = {value.name: value.size for key,
+#                            value in dataset.dimensions.items()}
+#
+#         slicer = SliceFileByTilesDesired(tiles_desired=2,
+#                                          desired_spatial_dimensions=[
+#                                              'lat', 'lon'],
+#                                          time_dimension=('time', 3))
+#         slices = slicer.generate_slices(dimension_specs)
+#         expected_slices = ['time:0:1,lat:0:509,lon:0:1018',
+#                            'time:1:2,lat:0:509,lon:0:1018',
+#
+#                            'time:0:1,lat:0:509,lon:1018:1440',
+#                            'time:1:2,lat:0:509,lon:1018:1440',
+#
+#                            'time:0:1,lat:509:720,lon:0:1018',
+#                            'time:1:2,lat:509:720,lon:0:1018',
+#
+#                            'time:0:1,lat:509:720,lon:1018:1440',
+#                            'time:1:2,lat:509:720,lon:1018:1440']
+#         self.assertEqual(slices, expected_slices)
+#
+#     def test_calculate_step_size_perfect_split_2dim(self):
+#         step_size = SliceFileByTilesDesired._calculate_step_size(1000, 100, 2)
+#         self.assertAlmostEqual(step_size, 100.0)
+#
+#     def test_calculate_step_size_perfect_split_3dim(self):
+#         step_size = SliceFileByTilesDesired._calculate_step_size(1000, 100, 3)
+#         self.assertAlmostEqual(step_size, 215.0)
+#
+#     def test_generate_spatial_slices(self):
+#         dimension_specs = {'lat': 8, 'lon': 8}
+#         slicer = SliceFileByTilesDesired(tiles_desired=2,
+#                                          desired_spatial_dimensions=dimension_specs)
+#         boundary_slices = slicer._generate_spatial_slices(tiles_desired=4,
+#                                                           dimension_specs=dimension_specs)
+#         expected_slices = [
+#             'lat:0:4,lon:0:4',
+#             'lat:0:4,lon:4:8',
+#             'lat:4:8,lon:0:4',
+#             'lat:4:8,lon:4:8'
+#         ]
+#         self.assertEqual(boundary_slices, expected_slices)
+#
+#     def test_generate_temporal_slices(self):
+#         slicer = SliceFileByTilesDesired(tiles_desired=2,
+#                                          desired_spatial_dimensions=None)
+#         time_slices = slicer._generate_temporal_slices(('time', 10))
+#         expected_time_slices = ['time:0:1',
+#                                 'time:1:2',
+#                                 'time:2:3',
+#                                 'time:3:4',
+#                                 'time:4:5',
+#                                 'time:5:6',
+#                                 'time:6:7',
+#                                 'time:7:8',
+#                                 'time:8:9']
+#         self.assertEqual(time_slices, expected_time_slices)
+#
+#
+# if __name__ == '__main__':
+#     unittest.main()
diff --git a/granule_ingester/tests/slicers/test_TileSlicer.py b/granule_ingester/tests/slicers/test_TileSlicer.py
new file mode 100644
index 0000000..c3ad97f
--- /dev/null
+++ b/granule_ingester/tests/slicers/test_TileSlicer.py
@@ -0,0 +1,68 @@
+import asyncio
+import os
+import unittest
+from granule_ingester.slicers.TileSlicer import TileSlicer
+import xarray as xr
+
+
+class TestTileSlicer(unittest.TestCase):
+    class ToyTileSlicer(TileSlicer):
+        def _generate_slices(self, dimensions):
+            return [
+                'time:0:1,lat:0:4,lon:0:4',
+                'time:1:2,lat:0:4,lon:0:4',
+                'time:2:3,lat:0:4,lon:0:4',
+
+                'time:0:1,lat:0:4,lon:4:8',
+                'time:1:2,lat:0:4,lon:4:8',
+                'time:2:3,lat:0:4,lon:4:8',
+
+                'time:0:1,lat:4:8,lon:0:4',
+                'time:1:2,lat:4:8,lon:0:4',
+                'time:2:3,lat:4:8,lon:0:4',
+
+                'time:0:1,lat:4:8,lon:4:8',
+                'time:1:2,lat:4:8,lon:4:8',
+                'time:2:3,lat:4:8,lon:4:8'
+            ]
+
+    def test_generate_tiles(self):
+        relative_path = '../granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+        file_path = os.path.join(os.path.dirname(__file__), relative_path)
+        with xr.open_dataset(file_path) as dataset:
+            slicer = TestTileSlicer.ToyTileSlicer().generate_tiles(dataset, file_path)
+
+        expected_slices = slicer._generate_slices(None)
+        self.assertEqual(file_path, slicer._granule_name)
+        self.assertEqual(expected_slices, slicer._tile_spec_list)
+
+    # def test_open_s3(self):
+    #     s3_path = 's3://nexus-ingest/avhrr/198109-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+    #     slicer = TestTileSlicer.ToyTileSlicer(resource=s3_path)
+    #
+    #     expected_slices = slicer._generate_slices(None)
+    #     asyncio.run(slicer.open())
+    #     self.assertIsNotNone(slicer.dataset)
+    #     self.assertEqual(expected_slices, slicer._tile_spec_list)
+
+    def test_next(self):
+        relative_path = '../granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc'
+        file_path = os.path.join(os.path.dirname(__file__), relative_path)
+        with xr.open_dataset(file_path) as dataset:
+            slicer = TestTileSlicer.ToyTileSlicer().generate_tiles(dataset, file_path)
+        generated_tiles = list(slicer)
+
+        expected_slices = slicer._generate_slices(None)
+        self.assertListEqual(expected_slices, [tile.summary.section_spec for tile in generated_tiles])
+        for tile in generated_tiles:
+            self.assertEqual(file_path, tile.summary.granule)
+
+    # def test_download_s3_file(self):
+    #     slicer = TestTileSlicer.ToyTileSlicer(resource=None)
+    #
+    #     asyncio.run(slicer._download_s3_file(
+    #         "s3://nexus-ingest/avhrr/198109-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc"))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/granule_ingester/tests/writers/__init__.py b/granule_ingester/tests/writers/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/tests/writers/test_SolrStore.py b/granule_ingester/tests/writers/test_SolrStore.py
new file mode 100644
index 0000000..76b85ac
--- /dev/null
+++ b/granule_ingester/tests/writers/test_SolrStore.py
@@ -0,0 +1,54 @@
+import asyncio
+import unittest
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from granule_ingester.writers import SolrStore
+
+
+class TestSolrStore(unittest.TestCase):
+
+    def test_build_solr_doc(self):
+        tile = nexusproto.NexusTile()
+        tile.summary.tile_id = 'test_id'
+        tile.summary.dataset_name = 'test_dataset'
+        tile.summary.dataset_uuid = 'test_dataset_id'
+        tile.summary.data_var_name = 'test_variable'
+        tile.summary.granule = 'test_granule_path'
+        tile.summary.section_spec = 'time:0:1,j:0:20,i:200:240'
+        tile.summary.bbox.lat_min = -180.1
+        tile.summary.bbox.lat_max = 180.2
+        tile.summary.bbox.lon_min = -90.5
+        tile.summary.bbox.lon_max = 90.0
+        tile.summary.stats.min = -10.0
+        tile.summary.stats.max = 25.5
+        tile.summary.stats.mean = 12.5
+        tile.summary.stats.count = 100
+        tile.summary.stats.min_time = 694224000
+        tile.summary.stats.max_time = 694310400
+
+        tile.tile.ecco_tile.depth = 10.5
+
+        metadata_store = SolrStore()
+        solr_doc = metadata_store._build_solr_doc(tile)
+
+        self.assertEqual('sea_surface_temp', solr_doc['table_s'])
+        self.assertEqual(
+            'POLYGON((-90.500 -180.100, 90.000 -180.100, 90.000 180.200, -90.500 180.200, -90.500 -180.100))',
+            solr_doc['geo'])
+        self.assertEqual('test_id', solr_doc['id'])
+        self.assertEqual('test_dataset!test_id', solr_doc['solr_id_s'])
+        self.assertEqual('time:0:1,j:0:20,i:200:240', solr_doc['sectionSpec_s'])
+        self.assertEqual('test_granule_path', solr_doc['granule_s'])
+        self.assertEqual('test_variable', solr_doc['tile_var_name_s'])
+        self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon'])
+        self.assertAlmostEqual(90.0, solr_doc['tile_max_lon'])
+        self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'])
+        self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'])
+        self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt'])
+        self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt'])
+        self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d'])
+        self.assertAlmostEqual(25.5, solr_doc['tile_max_val_d'])
+        self.assertAlmostEqual(12.5, solr_doc['tile_avg_val_d'])
+        self.assertEqual(100, solr_doc['tile_count_i'])
+        self.assertAlmostEqual(10.5, solr_doc['tile_depth'])