You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/10 15:55:20 UTC
[incubator-sdap-ingester] 01/04: better error handling
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 8a12adff653aec9be3155272ca4e96882bf03246
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500
better error handling
---
.../granule_ingester/consumer/Consumer.py | 7 +++-
.../granule_ingester/exceptions/Exceptions.py | 2 ++
.../granule_ingester/exceptions/__init__.py | 1 +
.../granule_ingester/pipeline/Pipeline.py | 39 +++++++++++-----------
granule_ingester/tests/pipeline/test_Pipeline.py | 9 ++---
5 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..31454c1 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -19,6 +19,7 @@ import aio_pika
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
+from granule_ingester.exceptions import PipelineBuildingError
logger = logging.getLogger(__name__)
@@ -74,9 +75,13 @@ class Consumer(HealthCheck):
metadata_store_factory=metadata_store_factory)
await pipeline.run()
message.ack()
+ except PipelineBuildingError as e:
+ message.reject()
+ logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+ f"from RabbitMQ. The exception was:\n{e}")
except Exception as e:
message.reject(requeue=True)
- logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+ logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self):
channel = await self._connection.channel()
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..4c03e48
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,2 @@
+class PipelineBuildingError(Exception):
+ pass
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..a36b19a
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import PipelineBuildingError
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..f872e4d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,10 +17,11 @@
import logging
import time
from typing import List
-
+from granule_ingester.exceptions import PipelineBuildingError
import aiomultiprocess
import xarray as xr
import yaml
+from yaml.scanner import ScannerError
from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.granule_loaders import GranuleLoader
@@ -90,38 +91,38 @@ class Pipeline:
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
- config = yaml.load(config_str, yaml.FullLoader)
- return cls._build_pipeline(config,
- data_store_factory,
- metadata_store_factory,
- processor_module_mappings)
-
- @classmethod
- def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
- with open(config_path) as config_file:
- config = yaml.load(config_file, yaml.FullLoader)
+ try:
+ config = yaml.load(config_str, yaml.FullLoader)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
processor_module_mappings)
+ except yaml.scanner.ScannerError:
+ raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
data_store_factory,
metadata_store_factory,
module_mappings: dict):
- granule_loader = GranuleLoader(**config['granule'])
+ try:
+ granule_loader = GranuleLoader(**config['granule'])
- slicer_config = config['slicer']
- slicer = cls._parse_module(slicer_config, module_mappings)
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
- tile_processors = []
- for processor_config in config['processors']:
- module = cls._parse_module(processor_config, module_mappings)
- tile_processors.append(module)
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
- return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ except KeyError as e:
+ raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+ except Exception:
+ raise PipelineBuildingError("Cannot build pipeline.")
@classmethod
def _parse_module(cls, module_config: dict, module_mappings: dict):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
pass
relative_path = "../config_files/ingestion_config_testfile.yaml"
- file_path = os.path.join(os.path.dirname(__file__), relative_path)
- pipeline = Pipeline.from_file(config_path=str(file_path),
- data_store_factory=MockDataStore,
- metadata_store_factory=MockMetadataStore)
+ with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+ yaml_str = file.read()
+ pipeline = Pipeline.from_string(config_str=yaml_str,
+ data_store_factory=MockDataStore,
+ metadata_store_factory=MockMetadataStore)
self.assertEqual(pipeline._data_store_factory, MockDataStore)
self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)