You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 01:16:47 UTC
[incubator-sdap-ingester] branch rabbitmq-fix created (now 67a9c6f)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a change to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.
at 67a9c6f better error handling
This branch includes the following new commits:
new 67a9c6f better error handling
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[incubator-sdap-ingester] 01/01: better error handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 67a9c6fba94aa36b46c6d87976479ca42e162285
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500
better error handling
---
.../granule_ingester/consumer/Consumer.py | 7 +++-
.../granule_ingester/exceptions/Exceptions.py | 2 ++
.../granule_ingester/exceptions/__init__.py | 1 +
.../granule_ingester/pipeline/Pipeline.py | 39 +++++++++++-----------
granule_ingester/tests/pipeline/test_Pipeline.py | 9 ++---
5 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..31454c1 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -19,6 +19,7 @@ import aio_pika
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
+from granule_ingester.exceptions import PipelineBuildingError
logger = logging.getLogger(__name__)
@@ -74,9 +75,13 @@ class Consumer(HealthCheck):
metadata_store_factory=metadata_store_factory)
await pipeline.run()
message.ack()
+ except PipelineBuildingError as e:
+ message.reject()
+ logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+ f"from RabbitMQ. The exception was:\n{e}")
except Exception as e:
message.reject(requeue=True)
- logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+ logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self):
channel = await self._connection.channel()
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..4c03e48
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,2 @@
+class PipelineBuildingError(Exception):
+ pass
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..a36b19a
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import PipelineBuildingError
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..f872e4d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,10 +17,11 @@
import logging
import time
from typing import List
-
+from granule_ingester.exceptions import PipelineBuildingError
import aiomultiprocess
import xarray as xr
import yaml
+from yaml.scanner import ScannerError
from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.granule_loaders import GranuleLoader
@@ -90,38 +91,38 @@ class Pipeline:
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
- config = yaml.load(config_str, yaml.FullLoader)
- return cls._build_pipeline(config,
- data_store_factory,
- metadata_store_factory,
- processor_module_mappings)
-
- @classmethod
- def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
- with open(config_path) as config_file:
- config = yaml.load(config_file, yaml.FullLoader)
+ try:
+ config = yaml.load(config_str, yaml.FullLoader)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
processor_module_mappings)
+ except yaml.scanner.ScannerError:
+ raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
data_store_factory,
metadata_store_factory,
module_mappings: dict):
- granule_loader = GranuleLoader(**config['granule'])
+ try:
+ granule_loader = GranuleLoader(**config['granule'])
- slicer_config = config['slicer']
- slicer = cls._parse_module(slicer_config, module_mappings)
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
- tile_processors = []
- for processor_config in config['processors']:
- module = cls._parse_module(processor_config, module_mappings)
- tile_processors.append(module)
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
- return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ except KeyError as e:
+ raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+ except Exception:
+ raise PipelineBuildingError("Cannot build pipeline.")
@classmethod
def _parse_module(cls, module_config: dict, module_mappings: dict):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
pass
relative_path = "../config_files/ingestion_config_testfile.yaml"
- file_path = os.path.join(os.path.dirname(__file__), relative_path)
- pipeline = Pipeline.from_file(config_path=str(file_path),
- data_store_factory=MockDataStore,
- metadata_store_factory=MockMetadataStore)
+ with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+ yaml_str = file.read()
+ pipeline = Pipeline.from_string(config_str=yaml_str,
+ data_store_factory=MockDataStore,
+ metadata_store_factory=MockMetadataStore)
self.assertEqual(pipeline._data_store_factory, MockDataStore)
self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)