You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/10 15:55:23 UTC

[incubator-sdap-ingester] 04/04: the healthchecks now raise exceptions if they rail

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit c6fdd6f6a41dd281c5bbab976b2c607bd2429f30
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Fri Jul 10 10:14:05 2020 -0500

    the healthchecks now raise exceptions if they rail
---
 .../granule_ingester/consumer/Consumer.py            | 12 ++++++------
 .../granule_ingester/exceptions/Exceptions.py        | 18 +++++++++++++++++-
 .../granule_ingester/exceptions/__init__.py          |  6 +++++-
 granule_ingester/granule_ingester/main.py            | 20 ++++++++++----------
 .../granule_ingester/writers/CassandraStore.py       |  9 +++++----
 .../granule_ingester/writers/DataStore.py            |  1 +
 .../granule_ingester/writers/SolrStore.py            |  4 ++--
 7 files changed, 46 insertions(+), 24 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 439415f..59db4e8 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,9 @@
 import logging
 
 import aio_pika
-import sys
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
+
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
+    RabbitMQFailedHealthCheckError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -48,8 +49,8 @@ class Consumer(HealthCheck):
             await connection.close()
             return True
         except:
-            logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
-            return False
+            raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+                                                 f"Connection string was {self._connection_string}")
 
     async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
@@ -97,8 +98,7 @@ class Consumer(HealthCheck):
             except aio_pika.exceptions.MessageProcessError:
                 # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
                 # connection has died, and attempting to close the queue will only raise another exception.
-                raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+                raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
                 queue_iter.close()
                 raise e
-
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 0dda59e..6e7d89a 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -10,5 +10,21 @@ class TileProcessingError(Exception):
     pass
 
 
-class ConnectionErrorRabbitMQ(Exception):
+class RabbitMQConnectionError(Exception):
+    pass
+
+
+class FailedHealthCheckError(Exception):
+    pass
+
+
+class CassandraFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
+class SolrFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
+class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index d0c2344..2ba1b4a 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,4 +1,8 @@
-from .Exceptions import ConnectionErrorRabbitMQ
+from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import FailedHealthCheckError
 from .Exceptions import PipelineBuildingError
 from .Exceptions import PipelineRunningError
+from .Exceptions import RabbitMQConnectionError
+from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import SolrFailedHealthCheckError
 from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 6e1a289..2754c7f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,6 +15,7 @@
 
 import argparse
 import asyncio
+from granule_ingester.exceptions import FailedHealthCheckError
 import logging
 from functools import partial
 from typing import List
@@ -106,16 +107,15 @@ async def main():
                         data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
                         metadata_store_factory=partial(solr_factory, solr_host_and_port))
     try:
-        if await run_health_checks(
-                [CassandraStore(cassandra_contact_points, cassandra_port),
-                 SolrStore(solr_host_and_port),
-                 consumer]):
-            async with consumer:
-                logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
-                await consumer.start_consuming()
-        else:
-            logger.error("Quitting because not all dependencies passed the health checks.")
-            sys.exit(1)
+        await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
+                                 SolrStore(solr_host_and_port),
+                                 consumer])
+        async with consumer:
+            logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+            await consumer.start_consuming()
+    except FailedHealthCheckError as e:
+        logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
+        sys.exit(1)
     except Exception as e:
         logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
         sys.exit(1)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index ffac63d..530871d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -23,6 +23,7 @@ from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
 from nexusproto.DataTile_pb2 import NexusTile, TileData
 
+from granule_ingester.exceptions import CassandraFailedHealthCheckError
 from granule_ingester.writers.DataStore import DataStore
 
 logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -47,9 +48,8 @@ class CassandraStore(DataStore):
             session = self._get_session()
             session.shutdown()
             return True
-        except:
-            logger.error("Cannot connect to Cassandra!")
-            return False
+        except Exception:
+            raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
 
     def _get_session(self) -> Session:
         cluster = Cluster(contact_points=self._contact_points, port=self._port)
@@ -69,7 +69,8 @@ class CassandraStore(DataStore):
             tile_id = uuid.UUID(tile.summary.tile_id)
             serialized_tile_data = TileData.SerializeToString(tile.tile)
             prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
-            await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+            await type(self)._execute_query_async(self._session, prepared_query,
+                                                  [tile_id, bytearray(serialized_tile_data)])
         except NoHostAvailable as e:
             logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
 
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..a64399b 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck
 
 class DataStore(HealthCheck, ABC):
 
+
     @abstractmethod
     def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
         pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..6baad28 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -26,7 +26,7 @@ from nexusproto.DataTile_pb2 import *
 from tenacity import *
 
 from granule_ingester.writers.MetadataStore import MetadataStore
-
+from granule_ingester.exceptions import SolrFailedHealthCheckError
 logger = logging.getLogger(__name__)
 
 
@@ -56,7 +56,7 @@ class SolrStore(MetadataStore):
                 else:
                     logger.error("Solr health check returned status {}.".format(response.status))
         except aiohttp.ClientConnectionError as e:
-            logger.error("Cannot connect to Solr!")
+            raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
 
         return False