You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:16:07 UTC

[incubator-sdap-ingester] branch async-history updated (a5d9a8f -> 54a1335)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


 discard a5d9a8f  async solr history
    omit 4d816f6  SDAP-277: Improved error handling in Granule Ingester (#15)
    omit fe6a1c5  SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)
    omit d94c89f  SDAP-266: add README note on synchronization of configmap with local path (#14)
    omit 68110d0  SDAP-273: Configure max threads in Granule Ingester (#13)
     add 13e7b9b  log all fs events
     new fec3299  SDAP-273: Configure max threads in Granule Ingester (#13)
     new 6ebf49f  SDAP-266: add README note on synchronization of configmap with local path (#14)
     new 03badb4  SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)
     new dc00585  SDAP-277: Improved error handling in Granule Ingester (#15)
     new 54a1335  async solr history

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a5d9a8f)
            \
             N -- N -- N   refs/heads/async-history (54a1335)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 collection_manager/collection_manager/services/CollectionWatcher.py | 5 +++++
 1 file changed, 5 insertions(+)


[incubator-sdap-ingester] 05/05: async solr history

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 54a13358540715e511010e2761366c459e649713
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:15:11 2020 -0700

    async solr history
---
 .../services/CollectionProcessor.py                |  8 +++----
 .../services/CollectionWatcher.py                  |  1 -
 .../history_manager/FileIngestionHistory.py        |  3 ++-
 .../services/history_manager/IngestionHistory.py   | 26 +++++++++++-----------
 .../history_manager/SolrIngestionHistory.py        | 11 +++++----
 collection_manager/docker/Dockerfile               | 12 +++++-----
 config_operator/config_operator/main.py            |  1 +
 .../granule_ingester/writers/SolrStore.py          |  1 +
 8 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..fc91e01 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
                         f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
-                        f"time range for collection '{collection.dataset_id}'. Skipping.")
+            logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+                         f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        history_manager.push(granule)
+        await history_manager.push(granule)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 63dd30c..0ae2b49 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable
 import yaml
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
-from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..140ae87 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
         Constructor
         :param history_path:
         :param dataset_id:
-        :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+        :param signature_fun: function which creates the signature of the cache,
+                              a file path string as argument and returns a string (md5sum, time stamp)
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    def push(self, file_path: str):
+    async def push(self, file_path: str):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        self._push_record(file_name, signature)
+        await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
             self._latest_ingested_file_update = os.path.getmtime(file_path)
         else:
             self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
 
-        self._save_latest_timestamp()
+        await self._save_latest_timestamp()
 
     def latest_ingested_mtime(self) -> Optional[datetime]:
         """
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    def already_ingested(self, file_path: str) -> bool:
+    async def already_ingested(self, file_path: str) -> bool:
         """
         Return a boolean indicating whether the specified file has already been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        return signature == self._get_signature(file_name)
+        return signature == await self._get_signature(file_name)
 
-    def get_granule_status(self,
-                           file_path: str,
-                           date_from: datetime = None,
-                           date_to: datetime = None) -> GranuleStatus:
+    async def get_granule_status(self,
+                                 file_path: str,
+                                 date_from: datetime = None,
+                                 date_to: datetime = None) -> GranuleStatus:
         """
         Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
         and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
         """
         if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
     @abstractmethod
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         pass
 
     @abstractmethod
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         pass
 
     @abstractmethod
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         pass
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
 import pysolr
 import requests
 
+from common.async_utils.AsyncUtils import run_in_executor
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
 from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
+logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
 
 
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
     def __del__(self):
         self._req_session.close()
 
+    @run_in_executor
     def _push_record(self, file_name, signature):
         hash_id = doc_key(self._dataset_id, file_name)
         self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
         self._solr_granules.commit()
         return None
 
+    @run_in_executor
     def _save_latest_timestamp(self):
         if self._solr_datasets:
             self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
         else:
             return None
 
+    @run_in_executor
     def _get_signature(self, file_name):
         hash_id = doc_key(self._dataset_id, file_name)
         results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "granule_s", "string")
                 self._add_field(schema_endpoint, "granule_signature_s", "string")
 
-            else:
-                logger.info(f"collection {self._granule_collection_name} already exists")
-
             if self._dataset_collection_name not in existing_collections:
                 # Create collection
                 payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "dataset_s", "string")
                 self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
 
-            else:
-                logger.info(f"collection {self._dataset_collection_name} already exists")
-
         except requests.exceptions.RequestException as e:
             logger.error(f"solr instance unreachable {self._solr_url}")
             raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
 RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
 RUN apt-get update && apt-get install -y kubectl
 
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
+RUN cd /common && python setup.py install
 RUN cd /collection_manager && python setup.py install
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index fbbbe6b..1df9cf6 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -4,6 +4,7 @@ import kopf
 from config_operator.config_source import RemoteGitConfig, LocalDirConfig
 from config_operator.k8s import K8sConfigMap
 
+
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..65f8b09 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
+from common import AsyncUtils
 
 import pysolr
 from kazoo.handlers.threading import KazooTimeoutError


[incubator-sdap-ingester] 04/05: SDAP-277: Improved error handling in Granule Ingester (#15)

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit dc005850664bd76161fdea20ce596720953b3ee2
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 19:28:07 2020 -0700

    SDAP-277: Improved error handling in Granule Ingester (#15)
    
    Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
 granule_ingester/conda-requirements.txt            |   2 +-
 .../granule_ingester/consumer/Consumer.py          |  40 +++++--
 .../granule_ingester/pipeline/Pipeline.py          | 132 +++++++++++++--------
 .../reading_processors/TileReadingProcessor.py     |  20 ++--
 granule_ingester/tests/pipeline/test_Pipeline.py   |   9 +-
 5 files changed, 130 insertions(+), 73 deletions(-)

diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 5df51fe..6c72837 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,6 +17,8 @@ import logging
 
 import aio_pika
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+    RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -39,7 +41,7 @@ class Consumer(HealthCheck):
         self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
                                                                                 password=rabbitmq_password,
                                                                                 host=rabbitmq_host)
-        self._connection = None
+        self._connection: aio_pika.Connection = None
 
     async def health_check(self) -> bool:
         try:
@@ -47,10 +49,10 @@ class Consumer(HealthCheck):
             await connection.close()
             return True
         except Exception:
-            logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
-            return False
+            raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+                                                 f"Connection string was {self._connection_string}")
 
-    async def _get_connection(self):
+    async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
 
     async def __aenter__(self):
@@ -75,19 +77,37 @@ class Consumer(HealthCheck):
                                             metadata_store_factory=metadata_store_factory,
                                             max_concurrency=pipeline_max_concurrency)
             await pipeline.run()
-            message.ack()
+            await message.ack()
+        except PipelineBuildingError as e:
+            await message.reject()
+            logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+                             f"from RabbitMQ. The exception was:\n{e}")
+        except PipelineRunningError as e:
+            await message.reject()
+            logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+        except LostConnectionError:
+            # Let main() handle this
+            raise
         except Exception as e:
-            message.reject(requeue=True)
-            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+            await message.reject(requeue=True)
+            logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self, pipeline_max_concurrency=16):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
-        async with queue.iterator() as queue_iter:
-            async for message in queue_iter:
+        queue_iter = queue.iterator()
+        async for message in queue_iter:
+            try:
                 await self._received_message(message,
                                              self._data_store_factory,
                                              self._metadata_store_factory,
                                              pipeline_max_concurrency)
+            except aio_pika.exceptions.MessageProcessError:
+                # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+                # connection has died, and attempting to close the queue will only raise another exception.
+                raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
+            except Exception as e:
+                await queue_iter.close()
+                await channel.close()
+                raise e
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e1e53bf..dabca81 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,38 +13,46 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import logging
-import os
+import pickle
 import time
+from multiprocessing import Manager
 from typing import List
 
 import xarray as xr
 import yaml
 
-import aiomultiprocess
+from aiomultiprocess import Pool
+from aiomultiprocess.types import ProxyException
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
-from granule_ingester.pipeline.Modules import modules as processor_module_mappings
+from granule_ingester.pipeline.Modules import \
+    modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
 from granule_ingester.slicers import TileSlicer
 from granule_ingester.writers import DataStore, MetadataStore
 from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
 
 logger = logging.getLogger(__name__)
 
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
 _worker_processor_list: List[TileProcessor] = None
 _worker_dataset = None
+_shared_memory = None
 
 
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
     global _worker_data_store
     global _worker_metadata_store
     global _worker_processor_list
     global _worker_dataset
+    global _shared_memory
 
     # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
     # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -52,19 +60,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
     _worker_metadata_store = metadata_store_factory()
     _worker_processor_list = processor_list
     _worker_dataset = dataset
+    _shared_memory = shared_memory
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
-    global _worker_data_store
-    global _worker_metadata_store
-    global _worker_processor_list
-    global _worker_dataset
+    try:
+        input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+        processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
-    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
-    processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-    if processed_tile:
-        await _worker_data_store.save_data(processed_tile)
-        await _worker_metadata_store.save_metadata(processed_tile)
+        if processed_tile:
+            await _worker_data_store.save_data(processed_tile)
+            await _worker_metadata_store.save_metadata(processed_tile)
+    except Exception as e:
+        pickling_support.install(e)
+        _shared_memory.error = pickle.dumps(e)
+        raise
 
 
 def _recurse(processor_list: List[TileProcessor],
@@ -91,25 +101,34 @@ class Pipeline:
         self._metadata_store_factory = metadata_store_factory
         self._max_concurrency = max_concurrency
 
-    @classmethod
-    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings,
-                                   max_concurrency)
+        # Create a SyncManager so that we can to communicate exceptions from the
+        # worker processes back to the main process.
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
+            cls._validate_config(config)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings,
                                        max_concurrency)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
+    # TODO: this method should validate the config against an actual schema definition
+    @staticmethod
+    def _validate_config(config: dict):
+        if type(config) is not dict:
+            raise PipelineBuildingError("Cannot build pipeline; the pipeline configuration that " +
+                                        "was received is not valid YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
@@ -117,17 +136,27 @@ class Pipeline:
                         metadata_store_factory,
                         module_mappings: dict,
                         max_concurrency: int):
-        granule_loader = GranuleLoader(**config['granule'])
-
-        slicer_config = config['slicer']
-        slicer = cls._parse_module(slicer_config, module_mappings)
-
-        tile_processors = []
-        for processor_config in config['processors']:
-            module = cls._parse_module(processor_config, module_mappings)
-            tile_processors.append(module)
-
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
+        try:
+            granule_loader = GranuleLoader(**config['granule'])
+
+            slicer_config = config['slicer']
+            slicer = cls._parse_module(slicer_config, module_mappings)
+
+            tile_processors = []
+            for processor_config in config['processors']:
+                module = cls._parse_module(processor_config, module_mappings)
+                tile_processors.append(module)
+
+            return cls(granule_loader,
+                       slicer,
+                       data_store_factory,
+                       metadata_store_factory,
+                       tile_processors,
+                       max_concurrency)
+        except KeyError as e:
+            raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+        except Exception:
+            raise PipelineBuildingError("Cannot build pipeline.")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -144,23 +173,32 @@ class Pipeline:
     async def run(self):
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
-            async with aiomultiprocess.Pool(initializer=_init_worker,
-                                            initargs=(self._tile_processors,
-                                                      dataset,
-                                                      self._data_store_factory,
-                                                      self._metadata_store_factory),
-                                            maxtasksperchild=self._max_concurrency,
-                                            childconcurrency=self._max_concurrency) as pool:
+
+            shared_memory = self._manager.Namespace()
+            async with Pool(initializer=_init_worker,
+                            initargs=(self._tile_processors,
+                                      dataset,
+                                      self._data_store_factory,
+                                      self._metadata_store_factory,
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
-                    await pool.map(_process_tile_in_worker, chunk)
+                for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
+                    try:
+                        await pool.map(_process_tile_in_worker, chunk)
+                    except ProxyException:
+                        pool.terminate()
+                        # Give the shared memory manager some time to write the exception
+                        # await asyncio.sleep(1)
+                        raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
     @staticmethod
-    def _chunk_list(items, chunk_size):
+    def _chunk_list(items, chunk_size: int):
         return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
 import xarray as xr
 from nexusproto import DataTile_pb2 as nexusproto
 
+from granule_ingester.exceptions import TileProcessingError
 from granule_ingester.processors.TileProcessor import TileProcessor
 
 
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
         self.latitude = latitude
         self.longitude = longitude
 
-        # Common optional properties
-        self.temp_dir = None
-        self.metadata = None
-        # self.temp_dir = self.environ['TEMP_DIR']
-        # self.metadata = self.environ['META']
-
     def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
-        dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+        try:
+            dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
 
-        output_tile = nexusproto.NexusTile()
-        output_tile.CopyFrom(tile)
-        output_tile.summary.data_var_name = self.variable_to_read
+            output_tile = nexusproto.NexusTile()
+            output_tile.CopyFrom(tile)
+            output_tile.summary.data_var_name = self.variable_to_read
 
-        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+            return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+        except Exception:
+            raise TileProcessingError("Could not generate tiles from the granule.")
 
     @abstractmethod
     def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
                 pass
 
         relative_path = "../config_files/ingestion_config_testfile.yaml"
-        file_path = os.path.join(os.path.dirname(__file__), relative_path)
-        pipeline = Pipeline.from_file(config_path=str(file_path),
-                                      data_store_factory=MockDataStore,
-                                      metadata_store_factory=MockMetadataStore)
+        with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+            yaml_str = file.read()
+        pipeline = Pipeline.from_string(config_str=yaml_str,
+                                        data_store_factory=MockDataStore,
+                                        metadata_store_factory=MockMetadataStore)
 
         self.assertEqual(pipeline._data_store_factory, MockDataStore)
         self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)


[incubator-sdap-ingester] 02/05: SDAP-266: add README note on synchronization of configmap with local path (#14)

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 6ebf49f12bf9803fd052a02b15063d3beacc394c
Author: thomas loubrieu <60...@users.noreply.github.com>
AuthorDate: Wed Aug 5 10:38:57 2020 -0700

    SDAP-266: add README note on synchronization of configmap with local path (#14)
    
    Co-authored-by: thomas loubrieu <th...@jpl.nasa.gov>
---
 config_operator/README.md               | 14 ++++++++++++--
 config_operator/config_operator/main.py | 17 ++++++++++-------
 2 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/config_operator/README.md b/config_operator/README.md
index 5f02804..d102d10 100644
--- a/config_operator/README.md
+++ b/config_operator/README.md
@@ -2,13 +2,23 @@
 
 ## Purpose
 
-Component which synchonizes local configuration in a directory, on a file system, or configuration files managed in a git repository with kubernetes configMap.
-This helps to make a configuration managed by the operators in a single place (git, host file system) available in the kubernetes cluster.
+Component which synchonizes configuration in a remote **GIT** repository with kubernetes configMap.
+This helps to make a configuration managed by the operators in a single place (git) available in the kubernetes cluster.
 
 For SDAP, it is used to make the configuration of the collections to be ingested available to the ingester service pods.
 
 The component runs as a kubernetes operator (see containerization section)
 
+To synchronize a configuration from a **local directory** on kubernetes hosts, you should use the following commands:
+
+    kubectl create configmap collections-config --from-file=/opt/sdap-collection-config/  -n <namespace> 
+    
+To update the configmap from the same directory run:
+
+    kubectl create configmap collections-config --from-file=/opt/sdap-collection-config/ -o yaml --dry-run | kubectl replace -n <namespace> -f -
+    
+
+
 # Developers
 
     git clone ...
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index 45d530f..fbbbe6b 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -1,14 +1,16 @@
 import logging
 import asyncio
 import kopf
-from config_operator.config_source import RemoteGitConfig
+from config_operator.config_source import RemoteGitConfig, LocalDirConfig
 from config_operator.k8s import K8sConfigMap
 
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 
-def create_config_synchronizer(spec, namespace):
+UPDATE_EVERY_SECOND_PROPERTY = 'update-every-seconds'
+
+def create_git_config_synchronizer(spec, namespace):
     if 'git-url' not in spec.keys():
         raise kopf.HandlerFatalError(f"git-url must be set.")
     if 'config-map' not in spec.keys():
@@ -20,7 +22,7 @@ def create_config_synchronizer(spec, namespace):
     logger.info(f'config-map = {config_map}')
 
     _kwargs = {}
-    for k in {'git-branch', 'git-username', 'git-token', 'update-every-seconds'}:
+    for k in {'git-branch', 'git-username', 'git-token', UPDATE_EVERY_SECOND_PROPERTY}:
         if k in spec:
             logger.info(f'{k} = {spec[k]}')
             _kwargs[k.replace('-', '_')] = spec[k]
@@ -38,24 +40,25 @@ def create_config_synchronizer(spec, namespace):
 
 @kopf.on.create('sdap.apache.org', 'v1', 'gitbasedconfigs')
 def create_fn(body, spec, **kwargs):
-    logger.info(f'sdap config operator creation')
+    logger.info(f'sdap git config operator creation')
 
     namespace = body['metadata']['namespace']
 
     msg = create_config_synchronizer(spec, namespace)
 
-    logger.info(f'sdap config operator created {msg}')
+    logger.info(f'sdap git config operator created {msg}')
 
     return {'message': msg}
 
 
+
 @kopf.on.update('sdap.apache.org', 'v1', 'gitbasedconfigs')
 def update_fn(spec, status, namespace, **kwargs):
-    logger.info(f'sdap config operator update')
+    logger.info(f'sdap git config operator update')
 
     msg = create_config_synchronizer(spec, namespace)
 
-    logger.info(f'sdap config operator updated {msg}')
+    logger.info(f'sdap local config operator updated {msg}')
 
     return {'message': msg}
 


[incubator-sdap-ingester] 03/05: SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 03badb4b57672ce63e0ac7231ecdae98c5f006e6
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 13:08:21 2020 -0700

    SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)
---
 granule_ingester/docker/entrypoint.sh              |  1 +
 granule_ingester/granule_ingester/main.py          | 43 ++++++++----
 .../granule_ingester/writers/SolrStore.py          | 81 ++++++++++++++--------
 granule_ingester/requirements.txt                  |  5 +-
 4 files changed, 85 insertions(+), 45 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index 04ed15c..662bd3d 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -10,4 +10,5 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra-username=$CASSANDRA_USERNAME) \
   $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
   $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \
+  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) \
   $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index b54cffd..15390fd 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -16,13 +16,14 @@
 import argparse
 import asyncio
 import logging
+import sys
 from functools import partial
 from typing import List
 
 from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
+from granule_ingester.writers import CassandraStore, SolrStore
 
 
 def cassandra_factory(contact_points, port, username, password):
@@ -31,8 +32,8 @@ def cassandra_factory(contact_points, port, username, password):
     return store
 
 
-def solr_factory(solr_host_and_port):
-    store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+    store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
     store.connect()
     return store
 
@@ -44,7 +45,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main():
+async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
     parser.add_argument('--rabbitmq-host',
@@ -84,6 +85,8 @@ async def main():
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('--zk_host_and_port',
+                        metavar="HOST:PORT")
     parser.add_argument('--max-threads',
                         default=16,
                         metavar='MAX_THREADS',
@@ -111,6 +114,7 @@ async def main():
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
     solr_host_and_port = args.solr_host_and_port
+    zk_host_and_port = args.zk_host_and_port
 
     consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
                         rabbitmq_username=args.rabbitmq_username,
@@ -121,19 +125,28 @@ async def main():
                                                    cassandra_port,
                                                    cassandra_username,
                                                    cassandra_password),
-                        metadata_store_factory=partial(solr_factory, solr_host_and_port))
-    if await run_health_checks([CassandraStore(cassandra_contact_points,
-                                               cassandra_port,
-                                               cassandra_username,
-                                               cassandra_password),
-                                SolrStore(solr_host_and_port),
-                                consumer]):
+                        metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
+    try:
+        solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+        await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                cassandra_port,
+                                                cassandra_username,
+                                                cassandra_password),
+                                 solr_store,
+                                 consumer])
         async with consumer:
             logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
             await consumer.start_consuming(args.max_threads)
-    else:
-        logger.error("Quitting because not all dependencies passed the health checks.")
+    except FailedHealthCheckError as e:
+        logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
+    except LostConnectionError as e:
+        logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
+    except Exception as e:
+        logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+    finally:
+        sys.exit(1)
 
 
 if __name__ == '__main__':
-    asyncio.run(main())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..e098672 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
 import logging
+from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
 
-import aiohttp
-from nexusproto.DataTile_pb2 import *
-from tenacity import *
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
+from nexusproto.DataTile_pb2 import TileSummary, NexusTile
 
+from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
 from granule_ingester.writers.MetadataStore import MetadataStore
 
 logger = logging.getLogger(__name__)
 
 
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
+
+
 class SolrStore(MetadataStore):
-    def __init__(self, host_and_port='http://localhost:8983'):
+    def __init__(self, solr_url=None, zk_url=None):
         super().__init__()
 
         self.TABLE_NAME = "sea_surface_temp"
         self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
 
-        self._host_and_port = host_and_port
+        self._solr_url = solr_url
+        self._zk_url = zk_url
         self.geo_precision: int = 3
-        self.collection: str = "nexustiles"
+        self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
         self.log.setLevel(logging.DEBUG)
-        self._session = None
+        self._solr = None
+
+    def _get_connection(self) -> pysolr.Solr:
+        if self._zk_url:
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
+            collections = {}
+            for c in zk.zk.get_children("collections"):
+                collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+            zk.collections = collections
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+        elif self._solr_url:
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
+        else:
+            raise RuntimeError("You must provide either solr_host or zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._session = aiohttp.ClientSession(loop=loop)
+        self._solr = self._get_connection()
 
     async def health_check(self):
         try:
-            async with aiohttp.ClientSession() as session:
-                response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection))
-                if response.status == 200:
-                    return True
-                else:
-                    logger.error("Solr health check returned status {}.".format(response.status))
-        except aiohttp.ClientConnectionError as e:
-            logger.error("Cannot connect to Solr!")
-
-        return False
+            connection = self._get_connection()
+            connection.ping()
+        except pysolr.SolrError:
+            raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+        except NoNodeError:
+            raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!")
+        except KazooTimeoutError:
+            raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
 
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
+        await self._save_document(solr_doc)
 
-        await self._save_document(self.collection, solr_doc)
-
-    @retry(stop=stop_after_attempt(5))
-    async def _save_document(self, collection: str, doc: dict):
-        url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection)
-        response = await self._session.post(url, json=doc)
-        if response.status < 200 or response.status >= 400:
-            raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status))
+    @run_in_executor
+    def _save_document(self, doc: dict):
+        try:
+            self._solr.add([doc])
+        except pysolr.SolrError:
+            raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.")
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..9b06860 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,6 @@
 cassandra-driver==3.23.0
-aiomultiprocess
+aiomultiprocess==0.7.0
 aioboto3
+tblib==1.6.0
+pysolr==3.9.0
+kazoo==2.8.0
\ No newline at end of file


[incubator-sdap-ingester] 01/05: SDAP-273: Configure max threads in Granule Ingester (#13)

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit fec32992abb8a62c30cd1ed51884806cf524438a
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 19:14:22 2020 -0700

    SDAP-273: Configure max threads in Granule Ingester (#13)
---
 granule_ingester/docker/entrypoint.sh              | 19 ++++++++-------
 .../granule_ingester/consumer/Consumer.py          | 15 ++++++++----
 granule_ingester/granule_ingester/main.py          | 24 +++++++++++--------
 .../granule_ingester/pipeline/Pipeline.py          | 28 ++++++++++++++--------
 4 files changed, 52 insertions(+), 34 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index b703ee3..04ed15c 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -1,12 +1,13 @@
 #!/bin/sh
 
 python /sdap/granule_ingester/main.py \
-  $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
-  $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \
-  $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \
-  $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
-  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
-  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
-  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
-  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
-  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
+  $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \
+  $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \
+  $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
+  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra-contact-points=$CASSANDRA_CONTACT_POINTS) \
+  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra-port=$CASSANDRA_PORT) \
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra-username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \
+  $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..5df51fe 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -46,7 +46,7 @@ class Consumer(HealthCheck):
             connection = await self._get_connection()
             await connection.close()
             return True
-        except:
+        except Exception:
             logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
             return False
 
@@ -64,25 +64,30 @@ class Consumer(HealthCheck):
     @staticmethod
     async def _received_message(message: aio_pika.IncomingMessage,
                                 data_store_factory,
-                                metadata_store_factory):
+                                metadata_store_factory,
+                                pipeline_max_concurrency: int):
         logger.info("Received a job from the queue. Starting pipeline.")
         try:
             config_str = message.body.decode("utf-8")
             logger.debug(config_str)
             pipeline = Pipeline.from_string(config_str=config_str,
                                             data_store_factory=data_store_factory,
-                                            metadata_store_factory=metadata_store_factory)
+                                            metadata_store_factory=metadata_store_factory,
+                                            max_concurrency=pipeline_max_concurrency)
             await pipeline.run()
             message.ack()
         except Exception as e:
             message.reject(requeue=True)
             logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
 
-    async def start_consuming(self):
+    async def start_consuming(self, pipeline_max_concurrency=16):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
 
         async with queue.iterator() as queue_iter:
             async for message in queue_iter:
-                await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+                await self._received_message(message,
+                                             self._data_store_factory,
+                                             self._metadata_store_factory,
+                                             pipeline_max_concurrency)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 9010e33..b54cffd 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -47,43 +47,47 @@ async def run_health_checks(dependencies: List[HealthCheck]):
 async def main():
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
-    parser.add_argument('--rabbitmq_host',
+    parser.add_argument('--rabbitmq-host',
                         default='localhost',
                         metavar='HOST',
                         help='RabbitMQ hostname to connect to. (Default: "localhost")')
-    parser.add_argument('--rabbitmq_username',
+    parser.add_argument('--rabbitmq-username',
                         default='guest',
                         metavar='USERNAME',
                         help='RabbitMQ username. (Default: "guest")')
-    parser.add_argument('--rabbitmq_password',
+    parser.add_argument('--rabbitmq-password',
                         default='guest',
                         metavar='PASSWORD',
                         help='RabbitMQ password. (Default: "guest")')
-    parser.add_argument('--rabbitmq_queue',
+    parser.add_argument('--rabbitmq-queue',
                         default="nexus",
                         metavar="QUEUE",
                         help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
-    parser.add_argument('--cassandra_contact_points',
+    parser.add_argument('--cassandra-contact-points',
                         default=['localhost'],
                         metavar="HOST",
                         nargs='+',
                         help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")')
-    parser.add_argument('--cassandra_port',
+    parser.add_argument('--cassandra-port',
                         default=9042,
                         metavar="PORT",
                         help='Cassandra port. (Default: 9042)')
-    parser.add_argument('--cassandra_username',
+    parser.add_argument('--cassandra-username',
                         metavar="USERNAME",
                         default=None,
                         help='Cassandra username. Optional.')
-    parser.add_argument('--cassandra_password',
+    parser.add_argument('--cassandra-password',
                         metavar="PASSWORD",
                         default=None,
                         help='Cassandra password. Optional.')
-    parser.add_argument('--solr_host_and_port',
+    parser.add_argument('--solr-host-and-port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('--max-threads',
+                        default=16,
+                        metavar='MAX_THREADS',
+                        help='Maximum number of threads to use when processing granules. (Default: 16)')
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -126,7 +130,7 @@ async def main():
                                 consumer]):
         async with consumer:
             logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
-            await consumer.start_consuming()
+            await consumer.start_consuming(args.max_threads)
     else:
         logger.error("Quitting because not all dependencies passed the health checks.")
 
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..e1e53bf 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -15,19 +15,20 @@
 
 
 import logging
+import os
 import time
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
-from nexusproto import DataTile_pb2 as nexusproto
 
+import aiomultiprocess
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
 from granule_ingester.slicers import TileSlicer
 from granule_ingester.writers import DataStore, MetadataStore
+from nexusproto import DataTile_pb2 as nexusproto
 
 logger = logging.getLogger(__name__)
 
@@ -81,36 +82,41 @@ class Pipeline:
                  slicer: TileSlicer,
                  data_store_factory,
                  metadata_store_factory,
-                 tile_processors: List[TileProcessor]):
+                 tile_processors: List[TileProcessor],
+                 max_concurrency: int):
         self._granule_loader = granule_loader
         self._tile_processors = tile_processors
         self._slicer = slicer
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
+        self._max_concurrency = max_concurrency
 
     @classmethod
-    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
+    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
         config = yaml.load(config_str, yaml.FullLoader)
         return cls._build_pipeline(config,
                                    data_store_factory,
                                    metadata_store_factory,
-                                   processor_module_mappings)
+                                   processor_module_mappings,
+                                   max_concurrency)
 
     @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
+    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
         with open(config_path) as config_file:
             config = yaml.load(config_file, yaml.FullLoader)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
-                                       processor_module_mappings)
+                                       processor_module_mappings,
+                                       max_concurrency)
 
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
                         data_store_factory,
                         metadata_store_factory,
-                        module_mappings: dict):
+                        module_mappings: dict,
+                        max_concurrency: int):
         granule_loader = GranuleLoader(**config['granule'])
 
         slicer_config = config['slicer']
@@ -121,7 +127,7 @@ class Pipeline:
             module = cls._parse_module(processor_config, module_mappings)
             tile_processors.append(module)
 
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -142,7 +148,9 @@ class Pipeline:
                                             initargs=(self._tile_processors,
                                                       dataset,
                                                       self._data_store_factory,
-                                                      self._metadata_store_factory)) as pool:
+                                                      self._metadata_store_factory),
+                                            maxtasksperchild=self._max_concurrency,
+                                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that