You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 22:15:14 UTC

[incubator-sdap-ingester] branch rabbitmq-fix updated: error handling

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/rabbitmq-fix by this push:
     new 293efc4  error handling
293efc4 is described below

commit 293efc4db504c850d94d20f1fd974b91fa29fd46
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 17:15:03 2020 -0500

    error handling
---
 .../granule_ingester/consumer/Consumer.py          | 30 ++++++++++++++--------
 .../granule_ingester/exceptions/Exceptions.py      |  4 +++
 .../granule_ingester/exceptions/__init__.py        |  3 ++-
 granule_ingester/granule_ingester/main.py          | 24 ++++++++++-------
 .../granule_ingester/pipeline/Pipeline.py          |  4 +++
 .../granule_ingester/writers/CassandraStore.py     | 13 ++++++----
 6 files changed, 52 insertions(+), 26 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index fadfe75..439415f 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,8 @@
 import logging
 
 import aio_pika
-
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+import sys
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -40,7 +40,7 @@ class Consumer(HealthCheck):
         self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
                                                                                 password=rabbitmq_password,
                                                                                 host=rabbitmq_host)
-        self._connection = None
+        self._connection: aio_pika.Connection = None
 
     async def health_check(self) -> bool:
         try:
@@ -51,7 +51,7 @@ class Consumer(HealthCheck):
             logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
             return False
 
-    async def _get_connection(self):
+    async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
 
     async def __aenter__(self):
@@ -74,23 +74,31 @@ class Consumer(HealthCheck):
                                             data_store_factory=data_store_factory,
                                             metadata_store_factory=metadata_store_factory)
             await pipeline.run()
-            message.ack()
+            await message.ack()
         except PipelineBuildingError as e:
-            message.reject()
+            await message.reject()
             logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
                              f"from RabbitMQ. The exception was:\n{e}")
         except PipelineRunningError as e:
-            message.reject()
+            await message.reject()
             logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
         except Exception as e:
-            message.reject(requeue=True)
+            await message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
-        async with queue.iterator() as queue_iter:
-            async for message in queue_iter:
+        queue_iter = queue.iterator()
+        async for message in queue_iter:
+            try:
                 await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+            except aio_pika.exceptions.MessageProcessError:
+                # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+                # connection has died, and attempting to close the queue will only raise another exception.
+                raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+            except Exception as e:
+                queue_iter.close()
+                raise e
+
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 8c25532..0dda59e 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -8,3 +8,7 @@ class PipelineRunningError(Exception):
 
 class TileProcessingError(Exception):
     pass
+
+
+class ConnectionErrorRabbitMQ(Exception):
+    pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 71607c2..d0c2344 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
-from .Exceptions import TileProcessingError
+from .Exceptions import ConnectionErrorRabbitMQ
 from .Exceptions import PipelineBuildingError
 from .Exceptions import PipelineRunningError
+from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 5a8fc2d..6e1a289 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore
 from granule_ingester.writers import SolrStore
+import sys
 
 
 def cassandra_factory(contact_points, port):
@@ -104,15 +105,20 @@ async def main():
                         rabbitmq_queue=args.rabbitmq_queue,
                         data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
                         metadata_store_factory=partial(solr_factory, solr_host_and_port))
-    if await run_health_checks(
-            [CassandraStore(cassandra_contact_points, cassandra_port),
-             SolrStore(solr_host_and_port),
-             consumer]):
-        async with consumer:
-            logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
-            await consumer.start_consuming()
-    else:
-        logger.error("Quitting because not all dependencies passed the health checks.")
+    try:
+        if await run_health_checks(
+                [CassandraStore(cassandra_contact_points, cassandra_port),
+                 SolrStore(solr_host_and_port),
+                 consumer]):
+            async with consumer:
+                logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+                await consumer.start_consuming()
+        else:
+            logger.error("Quitting because not all dependencies passed the health checks.")
+            sys.exit(1)
+    except Exception as e:
+        logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+        sys.exit(1)
 
 
 if __name__ == '__main__':
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index c7b5d6a..e52d99f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -22,6 +22,7 @@ import aiomultiprocess
 import xarray as xr
 import yaml
 from aiomultiprocess.types import ProxyException
+from cassandra.cluster import NoHostAvailable
 from nexusproto import DataTile_pb2 as nexusproto
 from yaml.scanner import ScannerError
 
@@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: str):
     processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
     if processed_tile:
+        # try:
         await _worker_data_store.save_data(processed_tile)
         await _worker_metadata_store.save_metadata(processed_tile)
+        # except NoHostAvailable as e:
+        #     logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
 
 
 def _recurse(processor_list: List[TileProcessor],
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 7a9f146..ffac63d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,7 +18,7 @@ import asyncio
 import logging
 import uuid
 
-from cassandra.cluster import Cluster, Session
+from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
 from nexusproto.DataTile_pb2 import NexusTile, TileData
@@ -65,10 +65,13 @@ class CassandraStore(DataStore):
             self._session.shutdown()
 
     async def save_data(self, tile: NexusTile) -> None:
-        tile_id = uuid.UUID(tile.summary.tile_id)
-        serialized_tile_data = TileData.SerializeToString(tile.tile)
-        prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
-        await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+        try:
+            tile_id = uuid.UUID(tile.summary.tile_id)
+            serialized_tile_data = TileData.SerializeToString(tile.tile)
+            prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+            await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+        except NoHostAvailable as e:
+            logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
 
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):