You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 16:31:39 UTC
[incubator-sdap-ingester] branch rabbitmq-fix updated: better
exception handling
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/rabbitmq-fix by this push:
new 1061b77 better exception handling
1061b77 is described below
commit 1061b7775a040215c10c6bf0c2620ef8d2051c9a
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 11:31:28 2020 -0500
better exception handling
---
.../granule_ingester/consumer/Consumer.py | 5 ++++-
.../granule_ingester/exceptions/Exceptions.py | 10 +++++++++-
.../granule_ingester/exceptions/__init__.py | 4 +++-
.../granule_ingester/pipeline/Pipeline.py | 13 ++++++++++---
.../reading_processors/TileReadingProcessor.py | 20 +++++++++-----------
granule_ingester/requirements.txt | 2 +-
6 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 31454c1..fadfe75 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,9 +17,9 @@ import logging
import aio_pika
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
-from granule_ingester.exceptions import PipelineBuildingError
logger = logging.getLogger(__name__)
@@ -79,6 +79,9 @@ class Consumer(HealthCheck):
message.reject()
logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
f"from RabbitMQ. The exception was:\n{e}")
+ except PipelineRunningError as e:
+ message.reject()
+ logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
except Exception as e:
message.reject(requeue=True)
logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 4c03e48..8c25532 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -1,2 +1,10 @@
class PipelineBuildingError(Exception):
- pass
\ No newline at end of file
+ pass
+
+
+class PipelineRunningError(Exception):
+ pass
+
+
+class TileProcessingError(Exception):
+ pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index a36b19a..71607c2 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1 +1,3 @@
-from .Exceptions import PipelineBuildingError
\ No newline at end of file
+from .Exceptions import TileProcessingError
+from .Exceptions import PipelineBuildingError
+from .Exceptions import PipelineRunningError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f872e4d..c7b5d6a 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,13 +17,15 @@
import logging
import time
from typing import List
-from granule_ingester.exceptions import PipelineBuildingError
+
import aiomultiprocess
import xarray as xr
import yaml
-from yaml.scanner import ScannerError
+from aiomultiprocess.types import ProxyException
from nexusproto import DataTile_pb2 as nexusproto
+from yaml.scanner import ScannerError
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -62,6 +64,7 @@ async def _process_tile_in_worker(serialized_input_tile: str):
input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
+
if processed_tile:
await _worker_data_store.save_data(processed_tile)
await _worker_metadata_store.save_metadata(processed_tile)
@@ -149,7 +152,11 @@ class Pipeline:
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
- await pool.map(_process_tile_in_worker, chunk)
+ try:
+ await pool.map(_process_tile_in_worker, chunk)
+ except ProxyException:
+ pool.terminate()
+ raise PipelineRunningError("Running the pipeline failed and could not recover.")
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
import xarray as xr
from nexusproto import DataTile_pb2 as nexusproto
+from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
self.latitude = latitude
self.longitude = longitude
- # Common optional properties
- self.temp_dir = None
- self.metadata = None
- # self.temp_dir = self.environ['TEMP_DIR']
- # self.metadata = self.environ['META']
-
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
- dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+ try:
+ dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
- output_tile = nexusproto.NexusTile()
- output_tile.CopyFrom(tile)
- output_tile.summary.data_var_name = self.variable_to_read
+ output_tile = nexusproto.NexusTile()
+ output_tile.CopyFrom(tile)
+ output_tile.summary.data_var_name = self.variable_to_read
- return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ except Exception:
+ raise TileProcessingError("Could not generate tiles from the granule.")
@abstractmethod
def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..a6d64a2 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,3 @@
cassandra-driver==3.23.0
-aiomultiprocess
+aiomultiprocess==0.7.0
aioboto3