You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/03 20:59:36 UTC
[incubator-sdap-ingester] 06/10: exc handling
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit aa3b7b2d544a75ee1a61610d97bceba8dc0ff206
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 14:08:05 2020 -0700
exc handling
---
granule_ingester/granule_ingester/consumer/Consumer.py | 11 +++++++----
granule_ingester/granule_ingester/exceptions/Exceptions.py | 10 ++++++----
granule_ingester/granule_ingester/exceptions/__init__.py | 5 +++--
granule_ingester/granule_ingester/main.py | 5 ++++-
granule_ingester/granule_ingester/writers/CassandraStore.py | 4 ++--
5 files changed, 22 insertions(+), 13 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 59db4e8..d40b54c 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,8 +17,8 @@ import logging
import aio_pika
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
- RabbitMQFailedHealthCheckError
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+ RabbitMQFailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -83,6 +83,9 @@ class Consumer(HealthCheck):
except PipelineRunningError as e:
await message.reject()
logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+ except LostConnectionError:
+ # Let main() handle this
+ raise
except Exception as e:
await message.reject(requeue=True)
logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
@@ -98,7 +101,7 @@ class Consumer(HealthCheck):
except aio_pika.exceptions.MessageProcessError:
# Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
# connection has died, and attempting to close the queue will only raise another exception.
- raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
+ raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
except Exception as e:
- queue_iter.close()
+ await queue_iter.close()
raise e
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index f43bc2f..ca60608 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -10,11 +10,15 @@ class TileProcessingError(Exception):
pass
-class RabbitMQConnectionError(Exception):
+class LostConnectionError(Exception):
pass
-class CassandraConnectionError(Exception):
+class RabbitMQLostConnectionError(LostConnectionError):
+ pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
pass
@@ -32,5 +36,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
-
-
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 400c9bf..31cc5b8 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,9 +1,10 @@
-from .Exceptions import CassandraConnectionError
from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import CassandraLostConnectionError
from .Exceptions import FailedHealthCheckError
+from .Exceptions import LostConnectionError
from .Exceptions import PipelineBuildingError
from .Exceptions import PipelineRunningError
-from .Exceptions import RabbitMQConnectionError
from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import RabbitMQLostConnectionError
from .Exceptions import SolrFailedHealthCheckError
from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 2754c7f..b9d475b 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,7 +15,7 @@
import argparse
import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
import logging
from functools import partial
from typing import List
@@ -116,6 +116,9 @@ async def main():
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
sys.exit(1)
+ except LostConnectionError as e:
+ logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
+ sys.exit(1)
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
sys.exit(1)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index fbb5a7d..791911e 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -24,7 +24,7 @@ from cassandra.cqlengine.models import Model
from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
from nexusproto.DataTile_pb2 import NexusTile, TileData
-from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -77,7 +77,7 @@ class CassandraStore(DataStore):
await self._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
except Exception:
- raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.")
+ raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):