You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/10 15:55:23 UTC
[incubator-sdap-ingester] 04/04: the healthchecks now raise
exceptions if they rail
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit c6fdd6f6a41dd281c5bbab976b2c607bd2429f30
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Fri Jul 10 10:14:05 2020 -0500
the healthchecks now raise exceptions if they rail
---
.../granule_ingester/consumer/Consumer.py | 12 ++++++------
.../granule_ingester/exceptions/Exceptions.py | 18 +++++++++++++++++-
.../granule_ingester/exceptions/__init__.py | 6 +++++-
granule_ingester/granule_ingester/main.py | 20 ++++++++++----------
.../granule_ingester/writers/CassandraStore.py | 9 +++++----
.../granule_ingester/writers/DataStore.py | 1 +
.../granule_ingester/writers/SolrStore.py | 4 ++--
7 files changed, 46 insertions(+), 24 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 439415f..59db4e8 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,9 @@
import logging
import aio_pika
-import sys
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ
+
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
+ RabbitMQFailedHealthCheckError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -48,8 +49,8 @@ class Consumer(HealthCheck):
await connection.close()
return True
except:
- logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
- return False
+ raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+ f"Connection string was {self._connection_string}")
async def _get_connection(self) -> aio_pika.Connection:
return await aio_pika.connect_robust(self._connection_string)
@@ -97,8 +98,7 @@ class Consumer(HealthCheck):
except aio_pika.exceptions.MessageProcessError:
# Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
# connection has died, and attempting to close the queue will only raise another exception.
- raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.")
+ raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
queue_iter.close()
raise e
-
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 0dda59e..6e7d89a 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -10,5 +10,21 @@ class TileProcessingError(Exception):
pass
-class ConnectionErrorRabbitMQ(Exception):
+class RabbitMQConnectionError(Exception):
+ pass
+
+
+class FailedHealthCheckError(Exception):
+ pass
+
+
+class CassandraFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
+class SolrFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
+class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index d0c2344..2ba1b4a 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,4 +1,8 @@
-from .Exceptions import ConnectionErrorRabbitMQ
+from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import FailedHealthCheckError
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
+from .Exceptions import RabbitMQConnectionError
+from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import SolrFailedHealthCheckError
from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 6e1a289..2754c7f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,6 +15,7 @@
import argparse
import asyncio
+from granule_ingester.exceptions import FailedHealthCheckError
import logging
from functools import partial
from typing import List
@@ -106,16 +107,15 @@ async def main():
data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port),
metadata_store_factory=partial(solr_factory, solr_host_and_port))
try:
- if await run_health_checks(
- [CassandraStore(cassandra_contact_points, cassandra_port),
- SolrStore(solr_host_and_port),
- consumer]):
- async with consumer:
- logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
- await consumer.start_consuming()
- else:
- logger.error("Quitting because not all dependencies passed the health checks.")
- sys.exit(1)
+ await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port),
+ SolrStore(solr_host_and_port),
+ consumer])
+ async with consumer:
+ logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
+ await consumer.start_consuming()
+ except FailedHealthCheckError as e:
+ logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
+ sys.exit(1)
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
sys.exit(1)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index ffac63d..530871d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -23,6 +23,7 @@ from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from nexusproto.DataTile_pb2 import NexusTile, TileData
+from granule_ingester.exceptions import CassandraFailedHealthCheckError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -47,9 +48,8 @@ class CassandraStore(DataStore):
session = self._get_session()
session.shutdown()
return True
- except:
- logger.error("Cannot connect to Cassandra!")
- return False
+ except Exception:
+ raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
cluster = Cluster(contact_points=self._contact_points, port=self._port)
@@ -69,7 +69,8 @@ class CassandraStore(DataStore):
tile_id = uuid.UUID(tile.summary.tile_id)
serialized_tile_data = TileData.SerializeToString(tile.tile)
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)])
+ await type(self)._execute_query_async(self._session, prepared_query,
+ [tile_id, bytearray(serialized_tile_data)])
except NoHostAvailable as e:
logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..a64399b 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck
class DataStore(HealthCheck, ABC):
+
@abstractmethod
def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 9d6a7f0..6baad28 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -26,7 +26,7 @@ from nexusproto.DataTile_pb2 import *
from tenacity import *
from granule_ingester.writers.MetadataStore import MetadataStore
-
+from granule_ingester.exceptions import SolrFailedHealthCheckError
logger = logging.getLogger(__name__)
@@ -56,7 +56,7 @@ class SolrStore(MetadataStore):
else:
logger.error("Solr health check returned status {}.".format(response.status))
except aiohttp.ClientConnectionError as e:
- logger.error("Cannot connect to Solr!")
+ raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
return False