You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 16:31:39 UTC

[incubator-sdap-ingester] branch rabbitmq-fix updated: better exception handling

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/rabbitmq-fix by this push:
     new 1061b77  better exception handling
1061b77 is described below

commit 1061b7775a040215c10c6bf0c2620ef8d2051c9a
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Thu Jul 9 11:31:28 2020 -0500

    better exception handling
---
 .../granule_ingester/consumer/Consumer.py            |  5 ++++-
 .../granule_ingester/exceptions/Exceptions.py        | 10 +++++++++-
 .../granule_ingester/exceptions/__init__.py          |  4 +++-
 .../granule_ingester/pipeline/Pipeline.py            | 13 ++++++++++---
 .../reading_processors/TileReadingProcessor.py       | 20 +++++++++-----------
 granule_ingester/requirements.txt                    |  2 +-
 6 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 31454c1..fadfe75 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,9 +17,9 @@ import logging
 
 import aio_pika
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
-from granule_ingester.exceptions import PipelineBuildingError
 
 logger = logging.getLogger(__name__)
 
@@ -79,6 +79,9 @@ class Consumer(HealthCheck):
             message.reject()
             logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
                              f"from RabbitMQ. The exception was:\n{e}")
+        except PipelineRunningError as e:
+            message.reject()
+            logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
         except Exception as e:
             message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 4c03e48..8c25532 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -1,2 +1,10 @@
 class PipelineBuildingError(Exception):
-    pass
\ No newline at end of file
+    pass
+
+
+class PipelineRunningError(Exception):
+    pass
+
+
+class TileProcessingError(Exception):
+    pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index a36b19a..71607c2 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1 +1,3 @@
-from .Exceptions import PipelineBuildingError
\ No newline at end of file
+from .Exceptions import TileProcessingError
+from .Exceptions import PipelineBuildingError
+from .Exceptions import PipelineRunningError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f872e4d..c7b5d6a 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,13 +17,15 @@
 import logging
 import time
 from typing import List
-from granule_ingester.exceptions import PipelineBuildingError
+
 import aiomultiprocess
 import xarray as xr
 import yaml
-from yaml.scanner import ScannerError
+from aiomultiprocess.types import ProxyException
 from nexusproto import DataTile_pb2 as nexusproto
+from yaml.scanner import ScannerError
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -62,6 +64,7 @@ async def _process_tile_in_worker(serialized_input_tile: str):
 
     input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
     processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
+
     if processed_tile:
         await _worker_data_store.save_data(processed_tile)
         await _worker_metadata_store.save_metadata(processed_tile)
@@ -149,7 +152,11 @@ class Pipeline:
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
                 for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
-                    await pool.map(_process_tile_in_worker, chunk)
+                    try:
+                        await pool.map(_process_tile_in_worker, chunk)
+                    except ProxyException:
+                        pool.terminate()
+                        raise PipelineRunningError("Running the pipeline failed and could not recover.")
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
 import xarray as xr
 from nexusproto import DataTile_pb2 as nexusproto
 
+from granule_ingester.exceptions import TileProcessingError
 from granule_ingester.processors.TileProcessor import TileProcessor
 
 
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
         self.latitude = latitude
         self.longitude = longitude
 
-        # Common optional properties
-        self.temp_dir = None
-        self.metadata = None
-        # self.temp_dir = self.environ['TEMP_DIR']
-        # self.metadata = self.environ['META']
-
     def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
-        dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+        try:
+            dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
 
-        output_tile = nexusproto.NexusTile()
-        output_tile.CopyFrom(tile)
-        output_tile.summary.data_var_name = self.variable_to_read
+            output_tile = nexusproto.NexusTile()
+            output_tile.CopyFrom(tile)
+            output_tile.summary.data_var_name = self.variable_to_read
 
-        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+            return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+        except Exception:
+            raise TileProcessingError("Could not generate tiles from the granule.")
 
     @abstractmethod
     def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 4d9d4cb..a6d64a2 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,3 @@
 cassandra-driver==3.23.0
-aiomultiprocess
+aiomultiprocess==0.7.0
 aioboto3