You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/10 15:55:20 UTC

[incubator-sdap-ingester] 01/04: better error handling

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 8a12adff653aec9be3155272ca4e96882bf03246
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500

    better error handling
---
 .../granule_ingester/consumer/Consumer.py          |  7 +++-
 .../granule_ingester/exceptions/Exceptions.py      |  2 ++
 .../granule_ingester/exceptions/__init__.py        |  1 +
 .../granule_ingester/pipeline/Pipeline.py          | 39 +++++++++++-----------
 granule_ingester/tests/pipeline/test_Pipeline.py   |  9 ++---
 5 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..31454c1 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -19,6 +19,7 @@ import aio_pika
 
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
+from granule_ingester.exceptions import PipelineBuildingError
 
 logger = logging.getLogger(__name__)
 
@@ -74,9 +75,13 @@ class Consumer(HealthCheck):
                                             metadata_store_factory=metadata_store_factory)
             await pipeline.run()
             message.ack()
+        except PipelineBuildingError as e:
+            message.reject()
+            logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+                             f"from RabbitMQ. The exception was:\n{e}")
         except Exception as e:
             message.reject(requeue=True)
-            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+            logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..4c03e48
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,2 @@
+class PipelineBuildingError(Exception):
+    pass
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..a36b19a
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import PipelineBuildingError
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..f872e4d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,10 +17,11 @@
 import logging
 import time
 from typing import List
-
+from granule_ingester.exceptions import PipelineBuildingError
 import aiomultiprocess
 import xarray as xr
 import yaml
+from yaml.scanner import ScannerError
 from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.granule_loaders import GranuleLoader
@@ -90,38 +91,38 @@ class Pipeline:
 
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings)
-
-    @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
                         data_store_factory,
                         metadata_store_factory,
                         module_mappings: dict):
-        granule_loader = GranuleLoader(**config['granule'])
+        try:
+            granule_loader = GranuleLoader(**config['granule'])
 
-        slicer_config = config['slicer']
-        slicer = cls._parse_module(slicer_config, module_mappings)
+            slicer_config = config['slicer']
+            slicer = cls._parse_module(slicer_config, module_mappings)
 
-        tile_processors = []
-        for processor_config in config['processors']:
-            module = cls._parse_module(processor_config, module_mappings)
-            tile_processors.append(module)
+            tile_processors = []
+            for processor_config in config['processors']:
+                module = cls._parse_module(processor_config, module_mappings)
+                tile_processors.append(module)
 
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+            return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+        except KeyError as e:
+            raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+        except Exception:
+            raise PipelineBuildingError("Cannot build pipeline.")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
                 pass
 
         relative_path = "../config_files/ingestion_config_testfile.yaml"
-        file_path = os.path.join(os.path.dirname(__file__), relative_path)
-        pipeline = Pipeline.from_file(config_path=str(file_path),
-                                      data_store_factory=MockDataStore,
-                                      metadata_store_factory=MockMetadataStore)
+        with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+            yaml_str = file.read()
+        pipeline = Pipeline.from_string(config_str=yaml_str,
+                                        data_store_factory=MockDataStore,
+                                        metadata_store_factory=MockMetadataStore)
 
         self.assertEqual(pipeline._data_store_factory, MockDataStore)
         self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)