You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 23:06:02 UTC

[incubator-sdap-ingester] branch rabbitmq-fix updated (2525001 -> efedce7)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


 discard 2525001  use asyncio in the collection ingester
 discard cdd30ae  solr history bug fixes
 discard fd5cc22  use pysolr
 discard 385f5b8  error handling
 discard 41755a5  exc handling
 discard de8c5a0  propagate child worker exceptions up to main process
 discard 562ef6f  the healthchecks now raise exceptions if they rail
 discard 0c97d99  error handling
 discard 7feb58d  better exception handling
 discard 6efb623  better error handling
     new efedce7  better error handling

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2525001)
            \
             N -- N -- N   refs/heads/rabbitmq-fix (efedce7)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[incubator-sdap-ingester] 01/01: better error handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit efedce735b1fd47bf8fbcf2e2c6b815dcd32d50e
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500

    better error handling
    
    better exception handling
    
    error handling
    
    the healthchecks now raise exceptions if they rail
    
    propagate child worker exceptions up to main process
    
    exc handling
    
    error handling
    
    use pysolr
    
    solr history bug fixes
    
    use asyncio in the collection ingester
---
 collection_manager/collection_manager/main.py      |  36 +++----
 .../services/CollectionProcessor.py                |   8 +-
 .../services/CollectionWatcher.py                  |  34 +++---
 .../services/MessagePublisher.py                   |  41 +++----
 .../history_manager/SolrIngestionHistory.py        |  19 ++--
 collection_manager/requirements.txt                |   2 +
 granule_ingester/conda-requirements.txt            |   2 +-
 granule_ingester/docker/entrypoint.sh              |   5 +-
 .../granule_ingester/consumer/Consumer.py          |  40 +++++--
 .../granule_ingester/exceptions/Exceptions.py      |  41 +++++++
 .../granule_ingester/exceptions/__init__.py        |  11 ++
 granule_ingester/granule_ingester/main.py          |  61 ++++++++---
 .../granule_ingester/pipeline/Pipeline.py          | 119 ++++++++++++++-------
 .../reading_processors/TileReadingProcessor.py     |  20 ++--
 .../granule_ingester/writers/CassandraStore.py     |  39 +++++--
 .../granule_ingester/writers/DataStore.py          |   1 +
 .../granule_ingester/writers/SolrStore.py          |  79 +++++++++-----
 granule_ingester/requirements.txt                  |   5 +-
 granule_ingester/tests/pipeline/test_Pipeline.py   |   9 +-
 19 files changed, 383 insertions(+), 189 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 7e72de5..cbe22f9 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -63,28 +63,26 @@ async def main():
             history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
         else:
             history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
-        publisher = MessagePublisher(host=options.rabbitmq_host,
-                                     username=options.rabbitmq_username,
-                                     password=options.rabbitmq_password,
-                                     queue=options.rabbitmq_queue)
-        publisher.connect()
-        collection_processor = CollectionProcessor(message_publisher=publisher,
-                                                   history_manager_builder=history_manager_builder)
-        collection_watcher = CollectionWatcher(collections_path=options.collections_path,
-                                               collection_updated_callback=collection_processor.process_collection,
-                                               granule_updated_callback=collection_processor.process_granule,
-                                               collections_refresh_interval=int(options.refresh))
+        async with MessagePublisher(host=options.rabbitmq_host,
+                                    username=options.rabbitmq_username,
+                                    password=options.rabbitmq_password,
+                                    queue=options.rabbitmq_queue) as publisher:
+            collection_processor = CollectionProcessor(message_publisher=publisher,
+                                                       history_manager_builder=history_manager_builder)
+            collection_watcher = CollectionWatcher(collections_path=options.collections_path,
+                                                   collection_updated_callback=collection_processor.process_collection,
+                                                   granule_updated_callback=collection_processor.process_granule,
+                                                   collections_refresh_interval=int(options.refresh))
 
-        collection_watcher.start_watching()
-
-        while True:
-            try:
-                await asyncio.sleep(1)
-            except KeyboardInterrupt:
-                return
+            await collection_watcher.start_watching()
+            while True:
+                try:
+                    await asyncio.sleep(1)
+                except KeyboardInterrupt:
+                    return
 
     except Exception as e:
-        logger.error(e)
+        logger.exception(e)
         return
 
 
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 232cdee..d790f4b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -25,16 +25,16 @@ class CollectionProcessor:
         with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
             self._config_template = config_template_file.read()
 
-    def process_collection(self, collection: Collection):
+    async def process_collection(self, collection: Collection):
         """
         Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
         :param collection: A Collection definition
         :return: None
         """
         for granule in collection.files_owned():
-            self.process_granule(granule, collection)
+            await self.process_granule(granule, collection)
 
-    def process_granule(self, granule: str, collection: Collection):
+    async def process_granule(self, granule: str, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -64,7 +64,7 @@ class CollectionProcessor:
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
-        self._publisher.publish_message(body=dataset_config, priority=use_priority)
+        await self._publisher.publish_message(body=dataset_config, priority=use_priority)
         history_manager.push(granule)
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 2387016..0d7da84 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,7 +2,6 @@ import asyncio
 import logging
 import os
 from collections import defaultdict
-from functools import partial
 from typing import Dict, Callable, Set, Optional
 
 import yaml
@@ -38,7 +37,7 @@ class CollectionWatcher:
 
         self._granule_watches = set()
 
-    def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
+    async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
         """
         Periodically load the Collections Configuration file to check for changes,
         and observe filesystem events for added/modified granules. When an event occurs,
@@ -46,7 +45,7 @@ class CollectionWatcher:
         :return: None
         """
 
-        self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+        await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
         self._observer.start()
 
     def collections(self) -> Set[Collection]:
@@ -99,11 +98,11 @@ class CollectionWatcher:
         self._load_collections()
         return self.collections() - old_collections
 
-    def _reload_and_reschedule(self):
+    async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             for collection in updated_collections:
-                self._collection_updated_callback(collection)
+                await self._collection_updated_callback(collection)
             if len(updated_collections) > 0:
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -117,7 +116,9 @@ class CollectionWatcher:
 
     def _schedule_watches(self):
         for directory, collections in self._collections_by_dir.items():
-            granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+            granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(),
+                                                         self._granule_updated_callback,
+                                                         collections)
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
@@ -127,18 +128,22 @@ class CollectionWatcher:
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
 
     @classmethod
-    def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                coro,
+                                *args):
         """
         Call a function periodically. This uses asyncio, and is non-blocking.
         :param loop: An optional event loop to use. If None, the current running event loop will be used.
         :param wait_time: seconds to wait between iterations of func
-        :param func: the function that will be run
+        :param coro: the coroutine that will be awaited
         :param args: any args that need to be provided to func
         """
         if loop is None:
             loop = asyncio.get_running_loop()
-        func(*args)
-        loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
+        await coro(*args)
+        loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro))
 
 
 class _GranuleEventHandler(FileSystemEventHandler):
@@ -146,15 +151,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
-    def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
-        self._callback = callback
+    def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]):
+        self._loop = loop
+        self._callback_coro = callback_coro
         self._collections_for_dir = collections_for_dir
 
     def on_created(self, event):
         super().on_created(event)
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-                self._callback(event.src_path, collection)
+                self._loop.create_task(self._callback_coro(event.src_path, collection))
 
     def on_modified(self, event):
         super().on_modified(event)
@@ -163,4 +169,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-                self._callback(event.src_path, collection)
+                self._loop.create_task(self._callback_coro(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 559a69d..75803d1 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -1,4 +1,5 @@
-import pika
+from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust
+from tenacity import retry, stop_after_attempt, wait_fixed
 
 
 class MessagePublisher:
@@ -6,34 +7,34 @@ class MessagePublisher:
     def __init__(self, host: str, username: str, password: str, queue: str):
         self._connection_string = f"amqp://{username}:{password}@{host}/"
         self._queue = queue
-        self._channel = None
-        self._connection = None
+        self._channel: Channel = None
+        self._connection: Connection = None
 
-    def connect(self):
+    async def __aenter__(self):
+        await self._connect()
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        if self._connection:
+            await self._connection.close()
+
+    async def _connect(self):
         """
         Establish a connection to RabbitMQ.
         :return: None
         """
-        parameters = pika.URLParameters(self._connection_string)
-        self._connection = pika.BlockingConnection(parameters)
-        self._channel = self._connection.channel()
-        self._channel.queue_declare(self._queue, durable=True)
+        self._connection = await connect_robust(self._connection_string)
+        self._channel = await self._connection.channel()
+        await self._channel.declare_queue(self._queue, durable=True)
 
-    def publish_message(self, body: str, priority: int = None):
+    @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
+    async def publish_message(self, body: str, priority: int = None):
         """
         Publish a message to RabbitMQ using the optional message priority.
         :param body: A string to publish to RabbitMQ
         :param priority: An optional integer priority to use for the message
         :return: None
         """
-        properties = pika.BasicProperties(content_type='text/plain',
-                                          delivery_mode=1,
-                                          priority=priority)
-        self._channel.basic_publish(exchange='',
-                                    routing_key=self._queue,
-                                    body=body,
-                                    properties=properties)
-
-    def __del__(self):
-        if self._connection:
-            self._connection.close()
+        message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT)
+        await self._channel.default_exchange.publish(message, routing_key=self._queue)
+
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 1ae7156..4e6d3e5 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -35,8 +35,8 @@ class SolrIngestionHistory(IngestionHistory):
         try:
             self._solr_url = solr_url
             self._create_collection_if_needed()
-            self._solr_granules = pysolr.Solr('/'.join([solr_url.strip('/'), self._granule_collection_name]))
-            self._solr_datasets = pysolr.Solr('/'.join([solr_url.strip('/'), self._dataset_collection_name]))
+            self._solr_granules = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._granule_collection_name}")
+            self._solr_datasets = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._dataset_collection_name}")
             self._dataset_id = dataset_id
             self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
             self._latest_ingested_file_update = self._get_latest_file_update()
@@ -63,7 +63,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_datasets.add([{
                 'id': self._dataset_id,
                 'dataset_s': self._dataset_id,
-                'latest_update_l': self._latest_ingested_file_update}])
+                'latest_update_l': int(self._latest_ingested_file_update)}])
             self._solr_datasets.commit()
 
     def _get_latest_file_update(self):
@@ -87,8 +87,7 @@ class SolrIngestionHistory(IngestionHistory):
                 self._req_session = requests.session()
 
             payload = {'action': 'CLUSTERSTATUS'}
-            result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
-                                           params=payload)
+            result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
             response = result.json()
             node_number = len(response['cluster']['live_nodes'])
 
@@ -100,12 +99,11 @@ class SolrIngestionHistory(IngestionHistory):
                            'name': self._granule_collection_name,
                            'numShards': node_number
                            }
-                result = self._req_session.get('/'.join([self._solr_url.strip("/"), 'admin', 'collections']),
-                                               params=payload)
+                result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
                 response = result.json()
                 logger.info(f"solr collection created {response}")
                 # Update schema
-                schema_url = '/'.join([self._solr_url.strip('/'), self._granule_collection_name, 'schema'])
+                schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
                 # granule_s # dataset_s so that all the granule of a dataset are less likely to be on the same shard
                 # self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
                 self._add_field(schema_url, "dataset_s", "StrField")
@@ -121,13 +119,12 @@ class SolrIngestionHistory(IngestionHistory):
                            'name': self._dataset_collection_name,
                            'numShards': node_number
                            }
-                result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']),
-                                               params=payload)
+                result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload)
                 response = result.json()
                 logger.info(f"solr collection created {response}")
                 # Update schema
                 # http://localhost:8983/solr/nexusdatasets/schema?_=1588555874864&wt=json
-                schema_url = '/'.join([self._solr_url.strip('/'), self._dataset_collection_name, 'schema'])
+                schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema"
                 # self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField")
                 self._add_field(schema_url, "dataset_s", "StrField")
                 self._add_field(schema_url, "latest_update_l", "TrieLongField")
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..47ae867 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,3 +4,5 @@ pysolr==3.8.1
 pika==1.1.0
 watchdog==0.10.2
 requests==2.23.0
+aio-pika==6.6.1
+tenacity==6.2.0
\ No newline at end of file
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index e6f7262..2b6174a 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,4 +7,7 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
   $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
-  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \
+  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,6 +17,8 @@ import logging
 
 import aio_pika
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+    RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -39,7 +41,7 @@ class Consumer(HealthCheck):
         self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
                                                                                 password=rabbitmq_password,
                                                                                 host=rabbitmq_host)
-        self._connection = None
+        self._connection: aio_pika.Connection = None
 
     async def health_check(self) -> bool:
         try:
@@ -47,10 +49,10 @@ class Consumer(HealthCheck):
             await connection.close()
             return True
         except:
-            logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
-            return False
+            raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+                                                 f"Connection string was {self._connection_string}")
 
-    async def _get_connection(self):
+    async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
 
     async def __aenter__(self):
@@ -73,16 +75,34 @@ class Consumer(HealthCheck):
                                             data_store_factory=data_store_factory,
                                             metadata_store_factory=metadata_store_factory)
             await pipeline.run()
-            message.ack()
+            await message.ack()
+        except PipelineBuildingError as e:
+            await message.reject()
+            logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+                             f"from RabbitMQ. The exception was:\n{e}")
+        except PipelineRunningError as e:
+            await message.reject()
+            logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+        except LostConnectionError:
+            # Let main() handle this
+            raise
         except Exception as e:
-            message.reject(requeue=True)
-            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+            await message.reject(requeue=True)
+            logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
-        async with queue.iterator() as queue_iter:
-            async for message in queue_iter:
+        queue_iter = queue.iterator()
+        async for message in queue_iter:
+            try:
                 await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+            except aio_pika.exceptions.MessageProcessError:
+                # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+                # connection has died, and attempting to close the queue will only raise another exception.
+                raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
+            except Exception as e:
+                await queue_iter.close()
+                await channel.close()
+                raise e
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..c648b99
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,41 @@
+class PipelineBuildingError(Exception):
+    pass
+
+
+class PipelineRunningError(Exception):
+    pass
+
+
+class TileProcessingError(Exception):
+    pass
+
+
+class LostConnectionError(Exception):
+    pass
+
+
+class RabbitMQLostConnectionError(LostConnectionError):
+    pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
+    pass
+
+class SolrLostConnectionError(LostConnectionError):
+    pass
+
+
+class FailedHealthCheckError(Exception):
+    pass
+
+
+class CassandraFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
+class SolrFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
+class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
+    pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..ea0969f
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1,11 @@
+from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import CassandraLostConnectionError
+from .Exceptions import FailedHealthCheckError
+from .Exceptions import LostConnectionError
+from .Exceptions import PipelineBuildingError
+from .Exceptions import PipelineRunningError
+from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import RabbitMQLostConnectionError
+from .Exceptions import SolrFailedHealthCheckError
+from .Exceptions import SolrLostConnectionError
+from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 5a8fc2d..8b8d40f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -16,23 +16,24 @@
 import argparse
 import asyncio
 import logging
+import sys
 from functools import partial
 from typing import List
 
 from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
+from granule_ingester.writers import CassandraStore, SolrStore
 
 
-def cassandra_factory(contact_points, port):
-    store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+    store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password)
     store.connect()
     return store
 
 
-def solr_factory(solr_host_and_port):
-    store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+    store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
     store.connect()
     return store
 
@@ -44,7 +45,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main():
+async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
     parser.add_argument('--rabbitmq_host',
@@ -72,10 +73,20 @@ async def main():
                         default=9042,
                         metavar="PORT",
                         help='Cassandra port. (Default: 9042)')
+    parser.add_argument('--cassandra_username',
+                        metavar="USERNAME",
+                        default=None,
+                        help='Cassandra username. Optional.')
+    parser.add_argument('--cassandra_password',
+                        metavar="PASSWORD",
+                        default=None,
+                        help='Cassandra password. Optional.')
     parser.add_argument('--solr_host_and_port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('--zk_host_and_port',
+                        metavar="HOST:PORT")
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -96,24 +107,42 @@ async def main():
 
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
+    cassandra_username = args.cassandra_username
+    cassandra_password = args.cassandra_password
     solr_host_and_port = args.solr_host_and_port
+    zk_host_and_port = args.zk_host_and_port
 
     consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
                         rabbitmq_username=args.rabbitmq_username,
                         rabbitmq_password=args.rabbitmq_password,
                         rabbitmq_queue=args.rabbitmq_queue,
-                        data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
-                        metadata_store_factory=partial(solr_factory, solr_host_and_port))
-    if await run_health_checks(
-            [CassandraStore(cassandra_contact_points, cassandra_port),
-             SolrStore(solr_host_and_port),
-             consumer]):
+                        data_store_factory=partial(cassandra_factory,
+                                                   cassandra_contact_points,
+                                                   cassandra_port,
+                                                   cassandra_username,
+                                                   cassandra_password),
+                        metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
+    try:
+        solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+        await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                cassandra_port,
+                                                cassandra_username,
+                                                cassandra_password),
+                                 solr_store,
+                                 consumer])
         async with consumer:
             logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
             await consumer.start_consuming()
-    else:
-        logger.error("Quitting because not all dependencies passed the health checks.")
+    except FailedHealthCheckError as e:
+        logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
+    except LostConnectionError as e:
+        logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
+    except Exception as e:
+        logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+    finally:
+        sys.exit(1)
 
 
 if __name__ == '__main__':
-    asyncio.run(main())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,16 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import logging
+import os
+import pickle
 import time
+from multiprocessing import Manager
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
+from aiomultiprocess import Pool
+from aiomultiprocess.types import ProxyException
 from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
+from yaml.scanner import ScannerError
 
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -31,19 +37,23 @@ from granule_ingester.writers import DataStore, MetadataStore
 
 logger = logging.getLogger(__name__)
 
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
 _worker_processor_list: List[TileProcessor] = None
 _worker_dataset = None
+_shared_memory = None
 
 
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
     global _worker_data_store
     global _worker_metadata_store
     global _worker_processor_list
     global _worker_dataset
+    global _shared_memory
 
     # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
     # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -51,19 +61,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
     _worker_metadata_store = metadata_store_factory()
     _worker_processor_list = processor_list
     _worker_dataset = dataset
+    _shared_memory = shared_memory
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
-    global _worker_data_store
-    global _worker_metadata_store
-    global _worker_processor_list
-    global _worker_dataset
+    try:
+        input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+        processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
-    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
-    processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-    if processed_tile:
-        await _worker_data_store.save_data(processed_tile)
-        await _worker_metadata_store.save_metadata(processed_tile)
+        if processed_tile:
+            await _worker_data_store.save_data(processed_tile)
+            await _worker_metadata_store.save_metadata(processed_tile)
+    except Exception as e:
+        pickling_support.install(e)
+        _shared_memory.error = pickle.dumps(e)
+        raise
 
 
 def _recurse(processor_list: List[TileProcessor],
@@ -88,40 +100,56 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
-    @classmethod
-    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings)
+        self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
+        # Create a SyncManager so that we can to communicate exceptions from the
+        # worker processes back to the main process.
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
+            cls._validate_config(config)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
+    # TODO: this method should validate the config against an actual schema definition
+    @staticmethod
+    def _validate_config(config: dict):
+        if type(config) is not dict:
+            raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
                         data_store_factory,
                         metadata_store_factory,
                         module_mappings: dict):
-        granule_loader = GranuleLoader(**config['granule'])
+        try:
+            granule_loader = GranuleLoader(**config['granule'])
 
-        slicer_config = config['slicer']
-        slicer = cls._parse_module(slicer_config, module_mappings)
+            slicer_config = config['slicer']
+            slicer = cls._parse_module(slicer_config, module_mappings)
 
-        tile_processors = []
-        for processor_config in config['processors']:
-            module = cls._parse_module(processor_config, module_mappings)
-            tile_processors.append(module)
+            tile_processors = []
+            for processor_config in config['processors']:
+                module = cls._parse_module(processor_config, module_mappings)
+                tile_processors.append(module)
 
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+            return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+        except KeyError as e:
+            raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+        except Exception:
+            raise PipelineBuildingError("Cannot build pipeline.")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -136,23 +164,36 @@ class Pipeline:
         return processor_module
 
     async def run(self):
+
+        logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
-            async with aiomultiprocess.Pool(initializer=_init_worker,
-                                            initargs=(self._tile_processors,
-                                                      dataset,
-                                                      self._data_store_factory,
-                                                      self._metadata_store_factory)) as pool:
+
+            shared_memory = self._manager.Namespace()
+            async with Pool(initializer=_init_worker,
+                            initargs=(self._tile_processors,
+                                      dataset,
+                                      self._data_store_factory,
+                                      self._metadata_store_factory,
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
-                    await pool.map(_process_tile_in_worker, chunk)
+                for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
+                    try:
+                        await pool.map(_process_tile_in_worker, chunk)
+                    except ProxyException:
+                        pool.terminate()
+                        # Give the shared memory manager some time to write the exception
+                        # await asyncio.sleep(1)
+                        raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
     @staticmethod
-    def _chunk_list(items, chunk_size):
+    def _chunk_list(items, chunk_size: int):
         return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
 import xarray as xr
 from nexusproto import DataTile_pb2 as nexusproto
 
+from granule_ingester.exceptions import TileProcessingError
 from granule_ingester.processors.TileProcessor import TileProcessor
 
 
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
         self.latitude = latitude
         self.longitude = longitude
 
-        # Common optional properties
-        self.temp_dir = None
-        self.metadata = None
-        # self.temp_dir = self.environ['TEMP_DIR']
-        # self.metadata = self.environ['META']
-
     def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
-        dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+        try:
+            dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
 
-        output_tile = nexusproto.NexusTile()
-        output_tile.CopyFrom(tile)
-        output_tile.summary.data_var_name = self.variable_to_read
+            output_tile = nexusproto.NexusTile()
+            output_tile.CopyFrom(tile)
+            output_tile.summary.data_var_name = self.variable_to_read
 
-        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+            return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+        except Exception:
+            raise TileProcessingError("Could not generate tiles from the granule.")
 
     @abstractmethod
     def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 7a9f146..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,11 +18,14 @@ import asyncio
 import logging
 import uuid
 
-from cassandra.cluster import Cluster, Session
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
+from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
 from nexusproto.DataTile_pb2 import NexusTile, TileData
 
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
 from granule_ingester.writers.DataStore import DataStore
 
 logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -37,8 +40,10 @@ class TileModel(Model):
 
 
 class CassandraStore(DataStore):
-    def __init__(self, contact_points=None, port=9042):
+    def __init__(self, contact_points=None, port=9042, username=None, password=None):
         self._contact_points = contact_points
+        self._username = username
+        self._password = password
         self._port = port
         self._session = None
 
@@ -47,12 +52,22 @@ class CassandraStore(DataStore):
             session = self._get_session()
             session.shutdown()
             return True
-        except:
-            logger.error("Cannot connect to Cassandra!")
-            return False
+        except Exception:
+            raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
 
     def _get_session(self) -> Session:
-        cluster = Cluster(contact_points=self._contact_points, port=self._port)
+
+        if self._username and self._password:
+            auth_provider = PlainTextAuthProvider(username=self._username, password=self._password)
+        else:
+            auth_provider = None
+
+        cluster = Cluster(contact_points=self._contact_points,
+                          port=self._port,
+                          # load_balancing_policy=
+                          reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
+                          default_retry_policy=RetryPolicy(),
+                          auth_provider=auth_provider)
         session = cluster.connect()
         session.set_keyspace('nexustiles')
         return session
@@ -65,10 +80,14 @@ class CassandraStore(DataStore):
             self._session.shutdown()
 
     async def save_data(self, tile: NexusTile) -> None:
-        tile_id = uuid.UUID(tile.summary.tile_id)
-        serialized_tile_data = TileData.SerializeToString(tile.tile)
-        prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
-        await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+        try:
+            tile_id = uuid.UUID(tile.summary.tile_id)
+            serialized_tile_data = TileData.SerializeToString(tile.tile)
+            prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+            await self._execute_query_async(self._session, prepared_query,
+                                            [tile_id, bytearray(serialized_tile_data)])
+        except NoHostAvailable:
+            raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
 
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..a64399b 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck
 
 class DataStore(HealthCheck, ABC):
 
+
     @abstractmethod
     def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
         pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
 import logging
+from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
 
-import aiohttp
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
 from nexusproto.DataTile_pb2 import *
-from tenacity import *
 
+from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
 from granule_ingester.writers.MetadataStore import MetadataStore
 
 logger = logging.getLogger(__name__)
 
 
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
+
+
 class SolrStore(MetadataStore):
-    def __init__(self, host_and_port='http://localhost:8983'):
+    def __init__(self, solr_url=None, zk_url=None):
         super().__init__()
 
         self.TABLE_NAME = "sea_surface_temp"
         self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
 
-        self._host_and_port = host_and_port
+        self._solr_url = solr_url
+        self._zk_url = zk_url
         self.geo_precision: int = 3
-        self.collection: str = "nexustiles"
+        self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
         self.log.setLevel(logging.DEBUG)
-        self._session = None
+        self._solr = None
+
+    def _get_connection(self) -> pysolr.Solr:
+        if self._zk_url:
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
+            collections = {}
+            for c in zk.zk.get_children("collections"):
+                collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+            zk.collections = collections
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+        elif self._solr_url:
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+        else:
+            raise RuntimeError("You must provide either solr_host or zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._session = aiohttp.ClientSession(loop=loop)
+        self._solr = self._get_connection()
 
     async def health_check(self):
         try:
-            async with aiohttp.ClientSession() as session:
-                response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
-                if response.status == 200:
-                    return True
-                else:
-                    logger.error("Solr health check returned status {}.".format(response.status))
-        except aiohttp.ClientConnectionError as e:
-            logger.error("Cannot connect to Solr!")
-
-        return False
+            connection = self._get_connection()
+            connection.ping()
+        except pysolr.SolrError:
+            raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+        except NoNodeError:
+            raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
+        except KazooTimeoutError:
+            raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
 
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
+        await self._save_document(solr_doc)
 
-        await self._save_document(self.collection, solr_doc)
-
-    @retry(stop=stop_after_attempt(5))
-    async def _save_document(self, collection: str, doc: dict):
-        url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
-        response = await self._session.post(url, json=doc)
-        if response.status < 200 or response.status >= 400:
-            raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+    @run_in_executor
+    def _save_document(self, doc: dict):
+        try:
+            self._solr.add([doc])
+        except pysolr.SolrError:
+            raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..9b06860 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,6 @@
 cassandra-driver==3.23.0
-aiomultiprocess
+aiomultiprocess==0.7.0
 aioboto3
+tblib==1.6.0
+pysolr==3.9.0
+kazoo==2.8.0
\ No newline at end of file
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
                 pass
 
         relative_path = "../config_files/ingestion_config_testfile.yaml"
-        file_path = os.path.join(os.path.dirname(__file__), relative_path)
-        pipeline = Pipeline.from_file(config_path=str(file_path),
-                                      data_store_factory=MockDataStore,
-                                      metadata_store_factory=MockMetadataStore)
+        with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+            yaml_str = file.read()
+        pipeline = Pipeline.from_string(config_str=yaml_str,
+                                        data_store_factory=MockDataStore,
+                                        metadata_store_factory=MockMetadataStore)
 
         self.assertEqual(pipeline._data_store_factory, MockDataStore)
         self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)