You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:16:11 UTC

[incubator-sdap-ingester] 04/05: SDAP-277: Improved error handling in Granule Ingester (#15)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit dc005850664bd76161fdea20ce596720953b3ee2
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 19:28:07 2020 -0700

    SDAP-277: Improved error handling in Granule Ingester (#15)
    
    Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
 granule_ingester/conda-requirements.txt            |   2 +-
 .../granule_ingester/consumer/Consumer.py          |  40 +++++--
 .../granule_ingester/pipeline/Pipeline.py          | 132 +++++++++++++--------
 .../reading_processors/TileReadingProcessor.py     |  20 ++--
 granule_ingester/tests/pipeline/test_Pipeline.py   |   9 +-
 5 files changed, 130 insertions(+), 73 deletions(-)

diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 5df51fe..6c72837 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,6 +17,8 @@ import logging
 
 import aio_pika
 
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+    RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -39,7 +41,7 @@ class Consumer(HealthCheck):
         self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
                                                                                 password=rabbitmq_password,
                                                                                 host=rabbitmq_host)
-        self._connection = None
+        self._connection: aio_pika.Connection = None
 
     async def health_check(self) -> bool:
         try:
@@ -47,10 +49,10 @@ class Consumer(HealthCheck):
             await connection.close()
             return True
         except Exception:
-            logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
-            return False
+            raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+                                                 f"Connection string was {self._connection_string}")
 
-    async def _get_connection(self):
+    async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
 
     async def __aenter__(self):
@@ -75,19 +77,37 @@ class Consumer(HealthCheck):
                                             metadata_store_factory=metadata_store_factory,
                                             max_concurrency=pipeline_max_concurrency)
             await pipeline.run()
-            message.ack()
+            await message.ack()
+        except PipelineBuildingError as e:
+            await message.reject()
+            logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+                             f"from RabbitMQ. The exception was:\n{e}")
+        except PipelineRunningError as e:
+            await message.reject()
+            logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+        except LostConnectionError:
+            # Let main() handle this
+            raise
         except Exception as e:
-            message.reject(requeue=True)
-            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+            await message.reject(requeue=True)
+            logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self, pipeline_max_concurrency=16):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
-        async with queue.iterator() as queue_iter:
-            async for message in queue_iter:
+        queue_iter = queue.iterator()
+        async for message in queue_iter:
+            try:
                 await self._received_message(message,
                                              self._data_store_factory,
                                              self._metadata_store_factory,
                                              pipeline_max_concurrency)
+            except aio_pika.exceptions.MessageProcessError:
+                # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+                # connection has died, and attempting to close the queue will only raise another exception.
+                raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
+            except Exception as e:
+                await queue_iter.close()
+                await channel.close()
+                raise e
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e1e53bf..dabca81 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,38 +13,46 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import logging
-import os
+import pickle
 import time
+from multiprocessing import Manager
 from typing import List
 
 import xarray as xr
 import yaml
 
-import aiomultiprocess
+from aiomultiprocess import Pool
+from aiomultiprocess.types import ProxyException
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
-from granule_ingester.pipeline.Modules import modules as processor_module_mappings
+from granule_ingester.pipeline.Modules import \
+    modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
 from granule_ingester.slicers import TileSlicer
 from granule_ingester.writers import DataStore, MetadataStore
 from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
 
 logger = logging.getLogger(__name__)
 
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
 _worker_processor_list: List[TileProcessor] = None
 _worker_dataset = None
+_shared_memory = None
 
 
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
     global _worker_data_store
     global _worker_metadata_store
     global _worker_processor_list
     global _worker_dataset
+    global _shared_memory
 
     # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
     # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -52,19 +60,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
     _worker_metadata_store = metadata_store_factory()
     _worker_processor_list = processor_list
     _worker_dataset = dataset
+    _shared_memory = shared_memory
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
-    global _worker_data_store
-    global _worker_metadata_store
-    global _worker_processor_list
-    global _worker_dataset
+    try:
+        input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+        processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
-    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
-    processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-    if processed_tile:
-        await _worker_data_store.save_data(processed_tile)
-        await _worker_metadata_store.save_metadata(processed_tile)
+        if processed_tile:
+            await _worker_data_store.save_data(processed_tile)
+            await _worker_metadata_store.save_metadata(processed_tile)
+    except Exception as e:
+        pickling_support.install(e)
+        _shared_memory.error = pickle.dumps(e)
+        raise
 
 
 def _recurse(processor_list: List[TileProcessor],
@@ -91,25 +101,34 @@ class Pipeline:
         self._metadata_store_factory = metadata_store_factory
         self._max_concurrency = max_concurrency
 
-    @classmethod
-    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings,
-                                   max_concurrency)
+        # Create a SyncManager so that we can to communicate exceptions from the
+        # worker processes back to the main process.
+        self._manager = Manager()
+
+    def __del__(self):
+        self._manager.shutdown()
 
     @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
+            cls._validate_config(config)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings,
                                        max_concurrency)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
+    # TODO: this method should validate the config against an actual schema definition
+    @staticmethod
+    def _validate_config(config: dict):
+        if type(config) is not dict:
+            raise PipelineBuildingError("Cannot build pipeline; the pipeline configuration that " +
+                                        "was received is not valid YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
@@ -117,17 +136,27 @@ class Pipeline:
                         metadata_store_factory,
                         module_mappings: dict,
                         max_concurrency: int):
-        granule_loader = GranuleLoader(**config['granule'])
-
-        slicer_config = config['slicer']
-        slicer = cls._parse_module(slicer_config, module_mappings)
-
-        tile_processors = []
-        for processor_config in config['processors']:
-            module = cls._parse_module(processor_config, module_mappings)
-            tile_processors.append(module)
-
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
+        try:
+            granule_loader = GranuleLoader(**config['granule'])
+
+            slicer_config = config['slicer']
+            slicer = cls._parse_module(slicer_config, module_mappings)
+
+            tile_processors = []
+            for processor_config in config['processors']:
+                module = cls._parse_module(processor_config, module_mappings)
+                tile_processors.append(module)
+
+            return cls(granule_loader,
+                       slicer,
+                       data_store_factory,
+                       metadata_store_factory,
+                       tile_processors,
+                       max_concurrency)
+        except KeyError as e:
+            raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+        except Exception:
+            raise PipelineBuildingError("Cannot build pipeline.")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -144,23 +173,32 @@ class Pipeline:
     async def run(self):
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
-            async with aiomultiprocess.Pool(initializer=_init_worker,
-                                            initargs=(self._tile_processors,
-                                                      dataset,
-                                                      self._data_store_factory,
-                                                      self._metadata_store_factory),
-                                            maxtasksperchild=self._max_concurrency,
-                                            childconcurrency=self._max_concurrency) as pool:
+
+            shared_memory = self._manager.Namespace()
+            async with Pool(initializer=_init_worker,
+                            initargs=(self._tile_processors,
+                                      dataset,
+                                      self._data_store_factory,
+                                      self._metadata_store_factory,
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
-                    await pool.map(_process_tile_in_worker, chunk)
+                for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
+                    try:
+                        await pool.map(_process_tile_in_worker, chunk)
+                    except ProxyException:
+                        pool.terminate()
+                        # Give the shared memory manager some time to write the exception
+                        # await asyncio.sleep(1)
+                        raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
     @staticmethod
-    def _chunk_list(items, chunk_size):
+    def _chunk_list(items, chunk_size: int):
         return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
 import xarray as xr
 from nexusproto import DataTile_pb2 as nexusproto
 
+from granule_ingester.exceptions import TileProcessingError
 from granule_ingester.processors.TileProcessor import TileProcessor
 
 
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
         self.latitude = latitude
         self.longitude = longitude
 
-        # Common optional properties
-        self.temp_dir = None
-        self.metadata = None
-        # self.temp_dir = self.environ['TEMP_DIR']
-        # self.metadata = self.environ['META']
-
     def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
-        dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+        try:
+            dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
 
-        output_tile = nexusproto.NexusTile()
-        output_tile.CopyFrom(tile)
-        output_tile.summary.data_var_name = self.variable_to_read
+            output_tile = nexusproto.NexusTile()
+            output_tile.CopyFrom(tile)
+            output_tile.summary.data_var_name = self.variable_to_read
 
-        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+            return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+        except Exception:
+            raise TileProcessingError("Could not generate tiles from the granule.")
 
     @abstractmethod
     def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
                 pass
 
         relative_path = "../config_files/ingestion_config_testfile.yaml"
-        file_path = os.path.join(os.path.dirname(__file__), relative_path)
-        pipeline = Pipeline.from_file(config_path=str(file_path),
-                                      data_store_factory=MockDataStore,
-                                      metadata_store_factory=MockMetadataStore)
+        with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+            yaml_str = file.read()
+        pipeline = Pipeline.from_string(config_str=yaml_str,
+                                        data_store_factory=MockDataStore,
+                                        metadata_store_factory=MockMetadataStore)
 
         self.assertEqual(pipeline._data_store_factory, MockDataStore)
         self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)