You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/10 20:38:54 UTC

[incubator-sdap-ingester] 01/01: allow multiple collections to use the same directory

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 9054094b1e362a4f159b213bc5d5aab74becec32
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jun 9 17:02:55 2020 -0700

    allow multiple collections to use the same directory
---
 .../services/CollectionProcessor.py                | 10 ++--
 .../services/CollectionWatcher.py                  | 59 ++++++++++------------
 .../history_manager/FileIngestionHistory.py        |  5 +-
 3 files changed, 33 insertions(+), 41 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 40da416..75a86e2 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -48,14 +48,16 @@ class CollectionProcessor:
         granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
-            logger.info(f"New granule '{granule}' detected for forward-processing ingestion.")
+            logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
+                        f"in collection '{collection.dataset_id}'.")
             use_priority = collection.forward_processing_priority
         elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
-            logger.info(f"New granule '{granule}' detected for historical ingestion.")
+            logger.info(f"New granule '{granule}' detected for historical ingestion in collection "
+                        f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired time range. "
-                        f"Skipping.")
+            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+                        f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(collection, config_template=self._config_template)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 912ddad..b1fca64 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,6 +1,7 @@
 import logging
 import os
-from typing import List, Dict, Callable
+from collections import defaultdict
+from typing import List, Dict, Callable, Set
 
 import yaml
 from watchdog.events import FileSystemEventHandler
@@ -14,15 +15,16 @@ logger.setLevel(logging.DEBUG)
 
 
 class CollectionWatcher:
-    def __init__(self, collections_path: str,
+    def __init__(self,
+                 collections_path: str,
                  collection_updated_callback: Callable[[Collection], any],
                  granule_updated_callback: Callable[[str, Collection], any]):
         self._collections_path = collections_path
-        self._collection_updated = collection_updated_callback
-        self._granule_updated = granule_updated_callback
+        self._collection_updated_callback = collection_updated_callback
+        self._granule_updated_callback = granule_updated_callback
+        
+        self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
         self._observer = Observer()
-        self._watches = {}
-        self._collections: Dict[str, Collection] = {}
 
     def start_watching(self):
         """
@@ -32,21 +34,21 @@ class CollectionWatcher:
         """
         self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path, callback=self._refresh),
                                 os.path.dirname(self._collections_path))
-        self._refresh()
         self._observer.start()
+        self._refresh()
 
     def collections(self) -> List[Collection]:
         """
         Return a list of all Collections being watched.
         :return: A list of Collections
         """
-        return list(self._collections.values())
+        return [collection for collections in self._collections_by_dir.values() for collection in collections]
 
     def _load_collections(self):
         try:
             with open(self._collections_path, 'r') as f:
                 collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
-            new_collections = {}
+            self._collections_by_dir.clear()
             for _, collection_dict in collections_yaml.items():
                 collection = Collection.from_dict(collection_dict)
                 directory = collection.directory()
@@ -55,25 +57,20 @@ class CollectionWatcher:
                                  f"which is the same directory as the collection configuration file, "
                                  f"{self._collections_path}. The granules need to be in their own directory. "
                                  f"Ignoring collection {collection.dataset_id} for now.")
-                if directory in new_collections:
-                    logger.error(f"Ingestion order {collection.dataset_id} uses granule directory {directory} "
-                                 f"which conflicts with ingestion order {new_collections[directory].dataset_id}."
-                                 f" Ignoring {collection.dataset_id}.")
                 else:
-                    new_collections[directory] = collection
+                    self._collections_by_dir[directory].add(collection)
 
-            self._collections = new_collections
         except FileNotFoundError:
-            logger.error(f"Collection configuration file not found at {self._collections}.")
+            logger.error(f"Collection configuration file not found at {self._collections_path}.")
         except yaml.scanner.ScannerError:
             logger.error(f"Bad YAML syntax in collection configuration file. Will attempt to reload collections "
                          f"after the next configuration change.")
 
     def _refresh(self):
         for collection in self._get_updated_collections():
-            self._collection_updated(collection)
+            self._collection_updated_callback(collection)
 
-        self._unschedule_watches()
+        self._observer.unschedule_all()
         self._schedule_watches()
 
     def _get_updated_collections(self) -> List[Collection]:
@@ -81,17 +78,12 @@ class CollectionWatcher:
         self._load_collections()
         return list(set(self.collections()) - set(old_collections))
 
-    def _unschedule_watches(self):
-        for directory, watch in self._watches.items():
-            self._observer.unschedule(watch)
-        self._watches.clear()
-
     def _schedule_watches(self):
-        for collection in self.collections():
-            granule_event_handler = _GranuleEventHandler(self._granule_updated, collection)
-            directory = collection.directory()
-            if directory not in self._watches:
-                self._watches[directory] = self._observer.schedule(granule_event_handler, directory)
+        for directory, collections in self._collections_by_dir.items():
+            granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+            # Note: the Watchdog library does not schedule a new watch
+            # if one is already scheduled for the same directory
+            self._observer.schedule(granule_event_handler, directory)
 
 
 class _CollectionEventHandler(FileSystemEventHandler):
@@ -114,11 +106,12 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
-    def __init__(self, granule_updated: Callable[[str, Collection], any], collection: Collection):
-        self._granule_updated = granule_updated
-        self._collection = collection
+    def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
+        self._callback = callback
+        self._collections_for_dir = collections_for_dir
 
     def on_created(self, event):
         super().on_created(event)
-        if self._collection.owns_file(event.src_path):
-            self._granule_updated(event.src_path, self._collection)
+        for collection in self._collections_for_dir:
+            if collection.owns_file(event.src_path):
+                self._callback(event.src_path, collection)
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 9fab784..0a92317 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -40,20 +40,17 @@ class FileIngestionHistory(IngestionHistory):
 
         self._latest_ingested_file_update_file_path = os.path.join(history_path, f'{dataset_id}.ts')
         if os.path.exists(self._latest_ingested_file_update_file_path):
-            logger.info(f"read latest ingested file update date from {self._latest_ingested_file_update_file_path}")
             with open(self._latest_ingested_file_update_file_path, 'r') as f_ts:
                 self._latest_ingested_file_update = float(f_ts.readline())
 
     def _load_history_dict(self):
-        logger.info(f"loading history file {self._history_file_path}")
         try:
             with open(self._history_file_path, 'r') as f_history:
                 for line in f_history:
                     filename, md5sum = line.strip().split(',')
-                    logger.info(f"add to history file {filename} with md5sum {md5sum}")
                     self._history_dict[filename] = md5sum
         except FileNotFoundError:
-            logger.info("no history file created yet")
+            logger.info("No history file created yet")
 
     def __del__(self):
         self._history_file.close()