You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/30 01:14:53 UTC

[incubator-sdap-ingester] 06/10: exc handling

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 41755a5f1663085094c9d55de8c183748b65bb63
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 14:08:05 2020 -0700

    exc handling
---
 granule_ingester/granule_ingester/consumer/Consumer.py      | 11 +++++++----
 granule_ingester/granule_ingester/exceptions/Exceptions.py  | 10 ++++++----
 granule_ingester/granule_ingester/exceptions/__init__.py    |  5 +++--
 granule_ingester/granule_ingester/main.py                   |  5 ++++-
 granule_ingester/granule_ingester/writers/CassandraStore.py |  4 ++--
 5 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 59db4e8..d40b54c 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,8 +17,8 @@ import logging
 
 import aio_pika
 
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \
-    RabbitMQFailedHealthCheckError
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+    RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -83,6 +83,9 @@ class Consumer(HealthCheck):
         except PipelineRunningError as e:
             await message.reject()
             logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+        except LostConnectionError:
+            # Let main() handle this
+            raise
         except Exception as e:
             await message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
@@ -98,7 +101,7 @@ class Consumer(HealthCheck):
             except aio_pika.exceptions.MessageProcessError:
                 # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
                 # connection has died, and attempting to close the queue will only raise another exception.
-                raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.")
+                raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
             except Exception as e:
-                queue_iter.close()
+                await queue_iter.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index f43bc2f..ca60608 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -10,11 +10,15 @@ class TileProcessingError(Exception):
     pass
 
 
-class RabbitMQConnectionError(Exception):
+class LostConnectionError(Exception):
     pass
 
 
-class CassandraConnectionError(Exception):
+class RabbitMQLostConnectionError(LostConnectionError):
+    pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
     pass
 
 
@@ -32,5 +36,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
 
 class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
-
-
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 400c9bf..31cc5b8 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,9 +1,10 @@
-from .Exceptions import CassandraConnectionError
 from .Exceptions import CassandraFailedHealthCheckError
+from .Exceptions import CassandraLostConnectionError
 from .Exceptions import FailedHealthCheckError
+from .Exceptions import LostConnectionError
 from .Exceptions import PipelineBuildingError
 from .Exceptions import PipelineRunningError
-from .Exceptions import RabbitMQConnectionError
 from .Exceptions import RabbitMQFailedHealthCheckError
+from .Exceptions import RabbitMQLostConnectionError
 from .Exceptions import SolrFailedHealthCheckError
 from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 2754c7f..b9d475b 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,7 +15,7 @@
 
 import argparse
 import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError
+from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 import logging
 from functools import partial
 from typing import List
@@ -116,6 +116,9 @@ async def main():
     except FailedHealthCheckError as e:
         logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
         sys.exit(1)
+    except LostConnectionError as e:
+        logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
+        sys.exit(1)
     except Exception as e:
         logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
         sys.exit(1)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index fbb5a7d..791911e 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -24,7 +24,7 @@ from cassandra.cqlengine.models import Model
 from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
 from nexusproto.DataTile_pb2 import NexusTile, TileData
 
-from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError
 from granule_ingester.writers.DataStore import DataStore
 
 logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -77,7 +77,7 @@ class CassandraStore(DataStore):
             await self._execute_query_async(self._session, prepared_query,
                                             [tile_id, bytearray(serialized_tile_data)])
         except Exception:
-            raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.")
+            raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
 
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):