You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/22 23:52:06 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-254, SDAP-255, SDAP-256: Fix bug where ingestion history is not saved, fix bug where messages published to RabbitMQ are incorrect, fix bug where bad collection config file crashes app (#3)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5850e16  SDAP-254, SDAP-255, SDAP-256: Fix bug where ingestion history is not saved, fix bug where messages published to RabbitMQ are incorrect, fix bug where bad collection config file crashes app (#3)
5850e16 is described below

commit 5850e1656deb380a14e9c05c9127b4fafb74f832
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Jun 22 16:51:58 2020 -0700

    SDAP-254, SDAP-255, SDAP-256: Fix bug where ingestion history is not saved, fix bug where messages published to RabbitMQ are incorrect, fix bug where bad collection config file crashes app (#3)
---
 .../entities/exceptions/Exceptions.py                  |  2 +-
 .../collection_manager/entities/exceptions/__init__.py |  2 +-
 collection_manager/collection_manager/main.py          |  6 +++++-
 .../collection_manager/services/CollectionProcessor.py |  6 +++---
 .../collection_manager/services/CollectionWatcher.py   | 18 +++++++++++++-----
 .../services/history_manager/FileIngestionHistory.py   |  4 ++--
 .../services/history_manager/IngestionHistory.py       |  6 ++++++
 .../services/history_manager/SolrIngestionHistory.py   |  3 +--
 ...{collections_bad.yml => collections_bad_schema.yml} |  4 ++--
 ...{collections_bad.yml => collections_bad_syntax.yml} |  0
 .../tests/services/test_CollectionProcessor.py         |  6 +++---
 .../tests/services/test_CollectionWatcher.py           | 12 +++++++++---
 12 files changed, 46 insertions(+), 23 deletions(-)

diff --git a/collection_manager/collection_manager/entities/exceptions/Exceptions.py b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
index 8e63d24..c18c4c8 100644
--- a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -2,7 +2,7 @@ class RelativePathError(Exception):
     pass
 
 
-class YamlParsingError(Exception):
+class CollectionConfigParsingError(Exception):
     pass
 
 
diff --git a/collection_manager/collection_manager/entities/exceptions/__init__.py b/collection_manager/collection_manager/entities/exceptions/__init__.py
index 9a22c16..7fac507 100644
--- a/collection_manager/collection_manager/entities/exceptions/__init__.py
+++ b/collection_manager/collection_manager/entities/exceptions/__init__.py
@@ -3,4 +3,4 @@ from .Exceptions import ConflictingPathCollectionError
 from .Exceptions import MissingValueCollectionError
 from .Exceptions import RelativePathCollectionError
 from .Exceptions import RelativePathError
-from .Exceptions import YamlParsingError
+from .Exceptions import CollectionConfigParsingError
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index a10446f..bc2d356 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -70,8 +70,12 @@ def main():
                                                granule_updated_callback=collection_processor.process_granule)
 
         collection_watcher.start_watching()
+
         while True:
-            time.sleep(1)
+            try:
+                time.sleep(1)
+            except KeyboardInterrupt:
+                return
 
     except Exception as e:
         logger.error(e)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index a81390b..232cdee 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -63,7 +63,7 @@ class CollectionProcessor:
                         f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
-        dataset_config = self._fill_template(collection, config_template=self._config_template)
+        dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
         self._publisher.publish_message(body=dataset_config, priority=use_priority)
         history_manager.push(granule)
 
@@ -78,11 +78,11 @@ class CollectionProcessor:
         return self._history_manager_cache[dataset_id]
 
     @staticmethod
-    def _fill_template(collection: Collection, config_template: str) -> str:
+    def _fill_template(granule_path: str, collection: Collection, config_template: str) -> str:
         renderer = pystache.Renderer()
         config_content = renderer.render(config_template,
                                          {
-                                             'granule': collection.path,
+                                             'granule': granule_path,
                                              'dataset_id': collection.dataset_id,
                                              'variable': collection.variable
                                          })
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 6bbe7d9..a3c3bf7 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -9,7 +9,7 @@ from watchdog.observers import Observer
 from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import RelativePathError, YamlParsingError, \
+from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
     CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \
     RelativePathCollectionError
 
@@ -84,8 +84,12 @@ class CollectionWatcher:
             raise CollectionConfigFileNotFoundError("The collection config file could not be found at "
                                                     f"{self._collections_path}")
         except yaml.scanner.ScannerError:
-            raise YamlParsingError("Bad YAML syntax in collection configuration file. Will attempt to reload "
-                                   "collections after the next configuration change.")
+            raise CollectionConfigParsingError("Bad YAML syntax in collection configuration file. Will attempt "
+                                               "to reload collections after the next configuration change.")
+        except KeyError:
+            raise CollectionConfigParsingError("The collections configuration YAML file does not conform to the "
+                                               "proper schema. Will attempt to reload collections config after the "
+                                               "next file modification.")
 
     def _get_updated_collections(self) -> Set[Collection]:
         old_collections = self.collections()
@@ -98,7 +102,7 @@ class CollectionWatcher:
                 self._collection_updated_callback(collection)
             self._unschedule_watches()
             self._schedule_watches()
-        except YamlParsingError as e:
+        except CollectionConfigParsingError as e:
             logger.error(e)
 
     def _unschedule_watches(self):
@@ -111,7 +115,11 @@ class CollectionWatcher:
             granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
-            self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+            try:
+                self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+            except (FileNotFoundError, NotADirectoryError):
+                bad_collection_names = ' and '.join([col.dataset_id for col in collections])
+                logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
 
 
 class _CollectionEventHandler(FileSystemEventHandler):
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 0a92317..50f2170 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -20,6 +20,7 @@ class FileIngestionHistoryBuilder(IngestionHistoryBuilder):
                                     signature_fun=self._signature_fun)
 
 
+# TODO: clean this up, better tests
 class FileIngestionHistory(IngestionHistory):
 
     def __init__(self, history_path: str, dataset_id: str, signature_fun=None):
@@ -55,7 +56,6 @@ class FileIngestionHistory(IngestionHistory):
     def __del__(self):
         self._history_file.close()
         self._purge()
-        self._save_latest_timestamp()
         del self._history_dict
 
     def reset_cache(self):
@@ -91,8 +91,8 @@ class FileIngestionHistory(IngestionHistory):
 
     def _push_record(self, file_name, signature):
         self._history_dict[file_name] = signature
+
         self._history_file.write(f'{file_name},{signature}\n')
-        return None
 
     def _get_signature(self, file_name):
         return self._history_dict.get(file_name, None)
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index b14b409..d92cb24 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -53,6 +53,8 @@ class IngestionHistory(ABC):
         else:
             self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
 
+        self._save_latest_timestamp()
+
     def latest_ingested_mtime(self) -> Optional[datetime]:
         """
         Return the modified time of the most recently modified file that was ingested.
@@ -100,6 +102,10 @@ class IngestionHistory(ABC):
             return GranuleStatus.UNDESIRED
 
     @abstractmethod
+    def _save_latest_timestamp(self):
+        pass
+
+    @abstractmethod
     def _push_record(self, file_name, signature):
         pass
 
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 2d0438f..1ae7156 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -44,7 +44,6 @@ class SolrIngestionHistory(IngestionHistory):
             raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}")
 
     def __del__(self):
-        self._push_latest_ingested_date()
         self._req_session.close()
 
     def _push_record(self, file_name, signature):
@@ -58,7 +57,7 @@ class SolrIngestionHistory(IngestionHistory):
         self._solr_granules.commit()
         return None
 
-    def _push_latest_ingested_date(self):
+    def _save_latest_timestamp(self):
         if self._solr_datasets:
             self._solr_datasets.delete(q=f"id:{self._dataset_id}")
             self._solr_datasets.add([{
diff --git a/collection_manager/tests/resources/collections_bad.yml b/collection_manager/tests/resources/collections_bad_schema.yml
similarity index 94%
copy from collection_manager/tests/resources/collections_bad.yml
copy to collection_manager/tests/resources/collections_bad_schema.yml
index cac6a32..37c6ad3 100644
--- a/collection_manager/tests/resources/collections_bad.yml
+++ b/collection_manager/tests/resources/collections_bad_schema.yml
@@ -1,10 +1,10 @@
-collections:
+bad_key:
   - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
     path: /opt/data/grace/*land*.nc
     variable: lwe_thickness
     priority: 1
     forward-processing-priority: 5
-BAD SYNTAX!
+
   - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
     path: /opt/data/grace/*ocean*.nc
     variable: lwe_thickness
diff --git a/collection_manager/tests/resources/collections_bad.yml b/collection_manager/tests/resources/collections_bad_syntax.yml
similarity index 100%
rename from collection_manager/tests/resources/collections_bad.yml
rename to collection_manager/tests/resources/collections_bad_syntax.yml
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index 7899e22..56d5393 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -46,7 +46,7 @@ class TestCollectionProcessor(unittest.TestCase):
 
         expected = """
         granule:
-          resource: test_path
+          resource: /granules/test_granule.nc
         processors:
           - name: GridReadingProcessor
             variable_to_read: test_variable
@@ -54,13 +54,13 @@ class TestCollectionProcessor(unittest.TestCase):
             dataset_name: test_dataset
             """
         collection = Collection(dataset_id="test_dataset",
-                                path="test_path",
+                                path="/granules/test*.nc",
                                 variable="test_variable",
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
                                 date_to=None)
-        filled = CollectionProcessor._fill_template(collection, template)
+        filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template)
         self.assertEqual(filled, expected)
 
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index 7ae25a1..8c6ab5f 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -6,7 +6,7 @@ from datetime import datetime
 from unittest.mock import Mock
 
 from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import YamlParsingError, CollectionConfigFileNotFoundError, \
+from collection_manager.entities.exceptions import CollectionConfigParsingError, CollectionConfigFileNotFoundError, \
     RelativePathCollectionError, ConflictingPathCollectionError
 from collection_manager.services import CollectionWatcher
 
@@ -38,10 +38,16 @@ class TestCollectionWatcher(unittest.TestCase):
         self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/avhrr']), 1)
 
     def test_load_collections_with_bad_yaml_syntax(self):
-        collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad.yml')
+        collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad_syntax.yml')
         collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
 
-        self.assertRaises(YamlParsingError, collection_watcher._load_collections)
+        self.assertRaises(CollectionConfigParsingError, collection_watcher._load_collections)
+
+    def test_load_collections_with_bad_schema(self):
+        collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad_schema.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
+
+        self.assertRaises(CollectionConfigParsingError, collection_watcher._load_collections)
 
     def test_load_collections_with_file_not_found(self):
         collections_path = os.path.join(os.path.dirname(__file__), '../resources/does_not_exist.yml')