You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/30 01:14:54 UTC

[incubator-sdap-ingester] 07/10: error handling

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 385f5b83a169bf723608512fb98045b53f8dd5f2
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 17:36:08 2020 -0700

    error handling
---
 granule_ingester/granule_ingester/main.py          |  8 ++---
 .../granule_ingester/pipeline/Pipeline.py          | 35 +++++++++++++---------
 .../granule_ingester/writers/CassandraStore.py     |  2 +-
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index b9d475b..45877c2 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -46,7 +46,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
     return True
 
 
-async def main():
+async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
     parser.add_argument('--rabbitmq_host',
@@ -115,14 +115,14 @@ async def main():
             await consumer.start_consuming()
     except FailedHealthCheckError as e:
         logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
-        sys.exit(1)
     except LostConnectionError as e:
         logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
-        sys.exit(1)
     except Exception as e:
         logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+    finally:
         sys.exit(1)
 
 
 if __name__ == '__main__':
-    asyncio.run(main())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 14dc032..f1aa021 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -19,15 +19,15 @@ import time
 from multiprocessing import Manager
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
+from aiomultiprocess import Pool
 from aiomultiprocess.types import ProxyException
 from nexusproto import DataTile_pb2 as nexusproto
 from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError
+from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -36,7 +36,9 @@ from granule_ingester.writers import DataStore, MetadataStore
 
 logger = logging.getLogger(__name__)
 
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
@@ -97,9 +99,12 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
-        # Create a SyncManager Namespace so that we can to communicate exceptions from the
+        # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
-        self._shared_memory = Manager().Namespace()
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
@@ -158,26 +163,28 @@ class Pipeline:
     async def run(self):
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
-            async with aiomultiprocess.Pool(initializer=_init_worker,
-                                            initargs=(self._tile_processors,
-                                                      dataset,
-                                                      self._data_store_factory,
-                                                      self._metadata_store_factory,
-                                                      self._shared_memory)) as pool:
+
+            shared_memory = self._manager.Namespace()
+            async with Pool(initializer=_init_worker,
+                            initargs=(self._tile_processors,
+                                      dataset,
+                                      self._data_store_factory,
+                                      self._metadata_store_factory,
+                                      shared_memory)) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+                for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
                     try:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
-                        raise pickle.loads(self._shared_memory.error)
+                        raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
     @staticmethod
-    def _chunk_list(items, chunk_size):
+    def _chunk_list(items, chunk_size: int):
         return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 791911e..6b2cf32 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -76,7 +76,7 @@ class CassandraStore(DataStore):
             prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
             await self._execute_query_async(self._session, prepared_query,
                                             [tile_id, bytearray(serialized_tile_data)])
-        except Exception:
+        except NoHostAvailable:
             raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
 
     @staticmethod