You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/30 01:14:54 UTC
[incubator-sdap-ingester] 07/10: error handling
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 385f5b83a169bf723608512fb98045b53f8dd5f2
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 17:36:08 2020 -0700
error handling
---
granule_ingester/granule_ingester/main.py | 8 ++---
.../granule_ingester/pipeline/Pipeline.py | 35 +++++++++++++---------
.../granule_ingester/writers/CassandraStore.py | 2 +-
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index b9d475b..45877c2 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -46,7 +46,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
return True
-async def main():
+async def main(loop):
parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
'and ingest a granule for each message that comes through.')
parser.add_argument('--rabbitmq_host',
@@ -115,14 +115,14 @@ async def main():
await consumer.start_consuming()
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
- sys.exit(1)
except LostConnectionError as e:
logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
- sys.exit(1)
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
+ finally:
sys.exit(1)
if __name__ == '__main__':
- asyncio.run(main())
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(main(loop))
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 14dc032..f1aa021 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -19,15 +19,15 @@ import time
from multiprocessing import Manager
from typing import List
-import aiomultiprocess
import xarray as xr
import yaml
+from aiomultiprocess import Pool
from aiomultiprocess.types import ProxyException
from nexusproto import DataTile_pb2 as nexusproto
from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError
+from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -36,7 +36,9 @@ from granule_ingester.writers import DataStore, MetadataStore
logger = logging.getLogger(__name__)
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
_worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
@@ -97,9 +99,12 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
- # Create a SyncManager Namespace so that we can to communicate exceptions from the
+ # Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
- self._shared_memory = Manager().Namespace()
+ self._manager = Manager()
+
+ def __del__(self):
+ self._manager.shutdown()
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
@@ -158,26 +163,28 @@ class Pipeline:
async def run(self):
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
- async with aiomultiprocess.Pool(initializer=_init_worker,
- initargs=(self._tile_processors,
- dataset,
- self._data_store_factory,
- self._metadata_store_factory,
- self._shared_memory)) as pool:
+
+ shared_memory = self._manager.Namespace()
+ async with Pool(initializer=_init_worker,
+ initargs=(self._tile_processors,
+ dataset,
+ self._data_store_factory,
+ self._metadata_store_factory,
+ shared_memory)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+ for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
try:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
- raise pickle.loads(self._shared_memory.error)
+ raise pickle.loads(shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
@staticmethod
- def _chunk_list(items, chunk_size):
+ def _chunk_list(items, chunk_size: int):
return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 791911e..6b2cf32 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -76,7 +76,7 @@ class CassandraStore(DataStore):
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
await self._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
- except Exception:
+ except NoHostAvailable:
raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
@staticmethod