You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/06 23:16:11 UTC
[incubator-sdap-ingester] 04/05: SDAP-277: Improved error handling
in Granule Ingester (#15)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit dc005850664bd76161fdea20ce596720953b3ee2
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 19:28:07 2020 -0700
SDAP-277: Improved error handling in Granule Ingester (#15)
Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
granule_ingester/conda-requirements.txt | 2 +-
.../granule_ingester/consumer/Consumer.py | 40 +++++--
.../granule_ingester/pipeline/Pipeline.py | 132 +++++++++++++--------
.../reading_processors/TileReadingProcessor.py | 20 ++--
granule_ingester/tests/pipeline/test_Pipeline.py | 9 +-
5 files changed, 130 insertions(+), 73 deletions(-)
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
pyyaml==5.3.1
requests==2.23.0
aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 5df51fe..6c72837 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,6 +17,8 @@ import logging
import aio_pika
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+ RabbitMQFailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -39,7 +41,7 @@ class Consumer(HealthCheck):
self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
password=rabbitmq_password,
host=rabbitmq_host)
- self._connection = None
+ self._connection: aio_pika.Connection = None
async def health_check(self) -> bool:
try:
@@ -47,10 +49,10 @@ class Consumer(HealthCheck):
await connection.close()
return True
except Exception:
- logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
- return False
+ raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+ f"Connection string was {self._connection_string}")
- async def _get_connection(self):
+ async def _get_connection(self) -> aio_pika.Connection:
return await aio_pika.connect_robust(self._connection_string)
async def __aenter__(self):
@@ -75,19 +77,37 @@ class Consumer(HealthCheck):
metadata_store_factory=metadata_store_factory,
max_concurrency=pipeline_max_concurrency)
await pipeline.run()
- message.ack()
+ await message.ack()
+ except PipelineBuildingError as e:
+ await message.reject()
+ logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+ f"from RabbitMQ. The exception was:\n{e}")
+ except PipelineRunningError as e:
+ await message.reject()
+ logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+ except LostConnectionError:
+ # Let main() handle this
+ raise
except Exception as e:
- message.reject(requeue=True)
- logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+ await message.reject(requeue=True)
+ logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self, pipeline_max_concurrency=16):
channel = await self._connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
+ queue_iter = queue.iterator()
+ async for message in queue_iter:
+ try:
await self._received_message(message,
self._data_store_factory,
self._metadata_store_factory,
pipeline_max_concurrency)
+ except aio_pika.exceptions.MessageProcessError:
+ # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+ # connection has died, and attempting to close the queue will only raise another exception.
+ raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
+ except Exception as e:
+ await queue_iter.close()
+ await channel.close()
+ raise e
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e1e53bf..dabca81 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,38 +13,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
-import os
+import pickle
import time
+from multiprocessing import Manager
from typing import List
import xarray as xr
import yaml
-import aiomultiprocess
+from aiomultiprocess import Pool
+from aiomultiprocess.types import ProxyException
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
-from granule_ingester.pipeline.Modules import modules as processor_module_mappings
+from granule_ingester.pipeline.Modules import \
+ modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
from granule_ingester.slicers import TileSlicer
from granule_ingester.writers import DataStore, MetadataStore
from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
logger = logging.getLogger(__name__)
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
_worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
_worker_processor_list: List[TileProcessor] = None
_worker_dataset = None
+_shared_memory = None
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
global _worker_data_store
global _worker_metadata_store
global _worker_processor_list
global _worker_dataset
+ global _shared_memory
# _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
# however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -52,19 +60,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
_worker_metadata_store = metadata_store_factory()
_worker_processor_list = processor_list
_worker_dataset = dataset
+ _shared_memory = shared_memory
async def _process_tile_in_worker(serialized_input_tile: str):
- global _worker_data_store
- global _worker_metadata_store
- global _worker_processor_list
- global _worker_dataset
+ try:
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
- processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- if processed_tile:
- await _worker_data_store.save_data(processed_tile)
- await _worker_metadata_store.save_metadata(processed_tile)
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+ except Exception as e:
+ pickling_support.install(e)
+ _shared_memory.error = pickle.dumps(e)
+ raise
def _recurse(processor_list: List[TileProcessor],
@@ -91,25 +101,34 @@ class Pipeline:
self._metadata_store_factory = metadata_store_factory
self._max_concurrency = max_concurrency
- @classmethod
- def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
- config = yaml.load(config_str, yaml.FullLoader)
- return cls._build_pipeline(config,
- data_store_factory,
- metadata_store_factory,
- processor_module_mappings,
- max_concurrency)
+ # Create a SyncManager so that we can to communicate exceptions from the
+ # worker processes back to the main process.
+ self._manager = Manager()
+
+ def __del__(self):
+ self._manager.shutdown()
@classmethod
- def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
- with open(config_path) as config_file:
- config = yaml.load(config_file, yaml.FullLoader)
+ def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
+ try:
+ config = yaml.load(config_str, yaml.FullLoader)
+ cls._validate_config(config)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
processor_module_mappings,
max_concurrency)
+ except yaml.scanner.ScannerError:
+ raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
+ # TODO: this method should validate the config against an actual schema definition
+ @staticmethod
+ def _validate_config(config: dict):
+ if type(config) is not dict:
+ raise PipelineBuildingError("Cannot build pipeline; the pipeline configuration that " +
+ "was received is not valid YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
@@ -117,17 +136,27 @@ class Pipeline:
metadata_store_factory,
module_mappings: dict,
max_concurrency: int):
- granule_loader = GranuleLoader(**config['granule'])
-
- slicer_config = config['slicer']
- slicer = cls._parse_module(slicer_config, module_mappings)
-
- tile_processors = []
- for processor_config in config['processors']:
- module = cls._parse_module(processor_config, module_mappings)
- tile_processors.append(module)
-
- return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
+ try:
+ granule_loader = GranuleLoader(**config['granule'])
+
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
+
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
+
+ return cls(granule_loader,
+ slicer,
+ data_store_factory,
+ metadata_store_factory,
+ tile_processors,
+ max_concurrency)
+ except KeyError as e:
+ raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+ except Exception:
+ raise PipelineBuildingError("Cannot build pipeline.")
@classmethod
def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -144,23 +173,32 @@ class Pipeline:
async def run(self):
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
- async with aiomultiprocess.Pool(initializer=_init_worker,
- initargs=(self._tile_processors,
- dataset,
- self._data_store_factory,
- self._metadata_store_factory),
- maxtasksperchild=self._max_concurrency,
- childconcurrency=self._max_concurrency) as pool:
+
+ shared_memory = self._manager.Namespace()
+ async with Pool(initializer=_init_worker,
+ initargs=(self._tile_processors,
+ dataset,
+ self._data_store_factory,
+ self._metadata_store_factory,
+ shared_memory),
+ maxtasksperchild=self._max_concurrency,
+ childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
- await pool.map(_process_tile_in_worker, chunk)
+ for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
+ try:
+ await pool.map(_process_tile_in_worker, chunk)
+ except ProxyException:
+ pool.terminate()
+ # Give the shared memory manager some time to write the exception
+ # await asyncio.sleep(1)
+ raise pickle.loads(shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
@staticmethod
- def _chunk_list(items, chunk_size):
+ def _chunk_list(items, chunk_size: int):
return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
import xarray as xr
from nexusproto import DataTile_pb2 as nexusproto
+from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
self.latitude = latitude
self.longitude = longitude
- # Common optional properties
- self.temp_dir = None
- self.metadata = None
- # self.temp_dir = self.environ['TEMP_DIR']
- # self.metadata = self.environ['META']
-
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
- dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+ try:
+ dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
- output_tile = nexusproto.NexusTile()
- output_tile.CopyFrom(tile)
- output_tile.summary.data_var_name = self.variable_to_read
+ output_tile = nexusproto.NexusTile()
+ output_tile.CopyFrom(tile)
+ output_tile.summary.data_var_name = self.variable_to_read
- return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ except Exception:
+ raise TileProcessingError("Could not generate tiles from the granule.")
@abstractmethod
def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
pass
relative_path = "../config_files/ingestion_config_testfile.yaml"
- file_path = os.path.join(os.path.dirname(__file__), relative_path)
- pipeline = Pipeline.from_file(config_path=str(file_path),
- data_store_factory=MockDataStore,
- metadata_store_factory=MockMetadataStore)
+ with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+ yaml_str = file.read()
+ pipeline = Pipeline.from_string(config_str=yaml_str,
+ data_store_factory=MockDataStore,
+ metadata_store_factory=MockMetadataStore)
self.assertEqual(pipeline._data_store_factory, MockDataStore)
self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)