You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 01:16:47 UTC

[incubator-sdap-ingester] branch rabbitmq-fix created (now 67a9c6f)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


      at 67a9c6f  better error handling

This branch includes the following new commits:

     new 67a9c6f  better error handling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sdap-ingester] 01/01: better error handling

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 67a9c6fba94aa36b46c6d87976479ca42e162285
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500

    better error handling
---
 .../granule_ingester/consumer/Consumer.py          |  7 +++-
 .../granule_ingester/exceptions/Exceptions.py      |  2 ++
 .../granule_ingester/exceptions/__init__.py        |  1 +
 .../granule_ingester/pipeline/Pipeline.py          | 39 +++++++++++-----------
 granule_ingester/tests/pipeline/test_Pipeline.py   |  9 ++---
 5 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..31454c1 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -19,6 +19,7 @@ import aio_pika
 
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
+from granule_ingester.exceptions import PipelineBuildingError
 
 logger = logging.getLogger(__name__)
 
@@ -74,9 +75,13 @@ class Consumer(HealthCheck):
                                             metadata_store_factory=metadata_store_factory)
             await pipeline.run()
             message.ack()
+        except PipelineBuildingError as e:
+            message.reject()
+            logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+                             f"from RabbitMQ. The exception was:\n{e}")
         except Exception as e:
             message.reject(requeue=True)
-            logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+            logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
new file mode 100644
index 0000000..4c03e48
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -0,0 +1,2 @@
+class PipelineBuildingError(Exception):
+    pass
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
new file mode 100644
index 0000000..a36b19a
--- /dev/null
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import PipelineBuildingError
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..f872e4d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -17,10 +17,11 @@
 import logging
 import time
 from typing import List
-
+from granule_ingester.exceptions import PipelineBuildingError
 import aiomultiprocess
 import xarray as xr
 import yaml
+from yaml.scanner import ScannerError
 from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.granule_loaders import GranuleLoader
@@ -90,38 +91,38 @@ class Pipeline:
 
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
-        config = yaml.load(config_str, yaml.FullLoader)
-        return cls._build_pipeline(config,
-                                   data_store_factory,
-                                   metadata_store_factory,
-                                   processor_module_mappings)
-
-    @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
-        with open(config_path) as config_file:
-            config = yaml.load(config_file, yaml.FullLoader)
+        try:
+            config = yaml.load(config_str, yaml.FullLoader)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
                                        processor_module_mappings)
 
+        except yaml.scanner.ScannerError:
+            raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
                         data_store_factory,
                         metadata_store_factory,
                         module_mappings: dict):
-        granule_loader = GranuleLoader(**config['granule'])
+        try:
+            granule_loader = GranuleLoader(**config['granule'])
 
-        slicer_config = config['slicer']
-        slicer = cls._parse_module(slicer_config, module_mappings)
+            slicer_config = config['slicer']
+            slicer = cls._parse_module(slicer_config, module_mappings)
 
-        tile_processors = []
-        for processor_config in config['processors']:
-            module = cls._parse_module(processor_config, module_mappings)
-            tile_processors.append(module)
+            tile_processors = []
+            for processor_config in config['processors']:
+                module = cls._parse_module(processor_config, module_mappings)
+                tile_processors.append(module)
 
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+            return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+        except KeyError as e:
+            raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+        except Exception:
+            raise PipelineBuildingError("Cannot build pipeline.")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
                 pass
 
         relative_path = "../config_files/ingestion_config_testfile.yaml"
-        file_path = os.path.join(os.path.dirname(__file__), relative_path)
-        pipeline = Pipeline.from_file(config_path=str(file_path),
-                                      data_store_factory=MockDataStore,
-                                      metadata_store_factory=MockMetadataStore)
+        with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+            yaml_str = file.read()
+        pipeline = Pipeline.from_string(config_str=yaml_str,
+                                        data_store_factory=MockDataStore,
+                                        metadata_store_factory=MockMetadataStore)
 
         self.assertEqual(pipeline._data_store_factory, MockDataStore)
         self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)