You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 22:15:14 UTC
[incubator-sdap-ingester] branch rabbitmq-fix updated: error
handling
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/rabbitmq-fix by this push:
new 293efc4 error handling
293efc4 is described below
commit 293efc4db504c850d94d20f1fd974b91fa29fd46
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 17:15:03 2020 -0500
error handling
---
.../granule_ingester/consumer/Consumer.py | 30 ++++++++++++++--------
.../granule_ingester/exceptions/Exceptions.py | 4 +++
.../granule_ingester/exceptions/__init__.py | 3 ++-
granule_ingester/granule_ingester/main.py | 24 ++++++++++-------
.../granule_ingester/pipeline/Pipeline.py | 4 +++
.../granule_ingester/writers/CassandraStore.py | 13 ++++++----
6 files changed, 52 insertions(+), 26 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index fadfe75..439415f 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,8 @@
import logging
import aio_pika
-
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+import sys
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -40,7 +40,7 @@ class Consumer(HealthCheck):
self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
password=rabbitmq_password,
host=rabbitmq_host)
- self._connection = None
+ self._connection: aio_pika.Connection = None
async def health_check(self) -> bool:
try:
@@ -51,7 +51,7 @@ class Consumer(HealthCheck):
logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
return False
- async def _get_connection(self):
+ async def _get_connection(self) -> aio_pika.Connection:
return await aio_pika.connect_robust(self._connection_string)
async def __aenter__(self):
@@ -74,23 +74,31 @@ class Consumer(HealthCheck):
data_store_factory=data_store_factory,
metadata_store_factory=metadata_store_factory)
await pipeline.run()
- message.ack()
+ await message.ack()
except PipelineBuildingError as e:
- message.reject()
+ await message.reject()
logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
f"from RabbitMQ. The exception was:\n{e}")
except PipelineRunningError as e:
- message.reject()
+ await message.reject()
logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
except Exception as e:
- message.reject(requeue=True)
+ await message.reject(requeue=True)
logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self):
channel = await self._connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
+ queue_iter = queue.iterator()
+ async for message in queue_iter:
+ try:
await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+ except aio_pika.exceptions.MessageProcessError:
+ # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+ # connection has died, and attempting to close the queue will only raise another exception.
+ raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+ except Exception as e:
+ queue_iter.close()
+ raise e
+
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 8c25532..0dda59e 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -8,3 +8,7 @@ class PipelineRunningError(Exception):
class TileProcessingError(Exception):
pass
+
+
+class ConnectionErrorRabbitMQ(Exception):
+ pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 71607c2..d0c2344 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
-from .Exceptions import TileProcessingError
+from .Exceptions import ConnectionErrorRabbitMQ
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
+from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 5a8fc2d..6e1a289 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.writers import CassandraStore
from granule_ingester.writers import SolrStore
+import sys
def cassandra_factory(contact_points, port):
@@ -104,15 +105,20 @@ async def main():
rabbitmq_queue=args.rabbitmq_queue,
data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
metadata_store_factory=partial(solr_factory, solr_host_and_port))
- if await run_health_checks(
- [CassandraStore(cassandra_contact_points, cassandra_port),
- SolrStore(solr_host_and_port),
- consumer]):
- async with consumer:
- logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
- await consumer.start_consuming()
- else:
- logger.error("Quitting because not all dependencies passed the health checks.")
+ try:
+ if await run_health_checks(
+ [CassandraStore(cassandra_contact_points, cassandra_port),
+ SolrStore(solr_host_and_port),
+ consumer]):
+ async with consumer:
+ logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+ await consumer.start_consuming()
+ else:
+ logger.error("Quitting because not all dependencies passed the health checks.")
+ sys.exit(1)
+ except Exception as e:
+ logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+ sys.exit(1)
if __name__ == '__main__':
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index c7b5d6a..e52d99f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -22,6 +22,7 @@ import aiomultiprocess
import xarray as xr
import yaml
from aiomultiprocess.types import ProxyException
+from cassandra.cluster import NoHostAvailable
from nexusproto import DataTile_pb2 as nexusproto
from yaml.scanner import ScannerError
@@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: str):
processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
if processed_tile:
+ # try:
await _worker_data_store.save_data(processed_tile)
await _worker_metadata_store.save_metadata(processed_tile)
+ # except NoHostAvailable as e:
+ # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
def _recurse(processor_list: List[TileProcessor],
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 7a9f146..ffac63d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,7 +18,7 @@ import asyncio
import logging
import uuid
-from cassandra.cluster import Cluster, Session
+from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from nexusproto.DataTile_pb2 import NexusTile, TileData
@@ -65,10 +65,13 @@ class CassandraStore(DataStore):
self._session.shutdown()
async def save_data(self, tile: NexusTile) -> None:
- tile_id = uuid.UUID(tile.summary.tile_id)
- serialized_tile_data = TileData.SerializeToString(tile.tile)
- prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ try:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data = TileData.SerializeToString(tile.tile)
+ prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+ await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ except NoHostAvailable as e:
+ logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):