You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/10 20:38:53 UTC

[incubator-sdap-ingester] branch scheduler created (now 9054094)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


      at 9054094  allow multiple collections to use the same directory

This branch includes the following new commits:

     new 9054094  allow multiple collections to use the same directory

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sdap-ingester] 01/01: allow multiple collections to use the same directory

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 9054094b1e362a4f159b213bc5d5aab74becec32
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jun 9 17:02:55 2020 -0700

    allow multiple collections to use the same directory
---
 .../services/CollectionProcessor.py                | 10 ++--
 .../services/CollectionWatcher.py                  | 59 ++++++++++------------
 .../history_manager/FileIngestionHistory.py        |  5 +-
 3 files changed, 33 insertions(+), 41 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 40da416..75a86e2 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -48,14 +48,16 @@ class CollectionProcessor:
         granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
-            logger.info(f"New granule '{granule}' detected for forward-processing ingestion.")
+            logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
+                        f"in collection '{collection.dataset_id}'.")
             use_priority = collection.forward_processing_priority
         elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
-            logger.info(f"New granule '{granule}' detected for historical ingestion.")
+            logger.info(f"New granule '{granule}' detected for historical ingestion in collection "
+                        f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired time range. "
-                        f"Skipping.")
+            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+                        f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(collection, config_template=self._config_template)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 912ddad..b1fca64 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,6 +1,7 @@
 import logging
 import os
-from typing import List, Dict, Callable
+from collections import defaultdict
+from typing import List, Dict, Callable, Set
 
 import yaml
 from watchdog.events import FileSystemEventHandler
@@ -14,15 +15,16 @@ logger.setLevel(logging.DEBUG)
 
 
 class CollectionWatcher:
-    def __init__(self, collections_path: str,
+    def __init__(self,
+                 collections_path: str,
                  collection_updated_callback: Callable[[Collection], any],
                  granule_updated_callback: Callable[[str, Collection], any]):
         self._collections_path = collections_path
-        self._collection_updated = collection_updated_callback
-        self._granule_updated = granule_updated_callback
+        self._collection_updated_callback = collection_updated_callback
+        self._granule_updated_callback = granule_updated_callback
+        
+        self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
         self._observer = Observer()
-        self._watches = {}
-        self._collections: Dict[str, Collection] = {}
 
     def start_watching(self):
         """
@@ -32,21 +34,21 @@ class CollectionWatcher:
         """
         self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path, callback=self._refresh),
                                 os.path.dirname(self._collections_path))
-        self._refresh()
         self._observer.start()
+        self._refresh()
 
     def collections(self) -> List[Collection]:
         """
         Return a list of all Collections being watched.
         :return: A list of Collections
         """
-        return list(self._collections.values())
+        return [collection for collections in self._collections_by_dir.values() for collection in collections]
 
     def _load_collections(self):
         try:
             with open(self._collections_path, 'r') as f:
                 collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
-            new_collections = {}
+            self._collections_by_dir.clear()
             for _, collection_dict in collections_yaml.items():
                 collection = Collection.from_dict(collection_dict)
                 directory = collection.directory()
@@ -55,25 +57,20 @@ class CollectionWatcher:
                                  f"which is the same directory as the collection configuration file, "
                                  f"{self._collections_path}. The granules need to be in their own directory. "
                                  f"Ignoring collection {collection.dataset_id} for now.")
-                if directory in new_collections:
-                    logger.error(f"Ingestion order {collection.dataset_id} uses granule directory {directory} "
-                                 f"which conflicts with ingestion order {new_collections[directory].dataset_id}."
-                                 f" Ignoring {collection.dataset_id}.")
                 else:
-                    new_collections[directory] = collection
+                    self._collections_by_dir[directory].add(collection)
 
-            self._collections = new_collections
         except FileNotFoundError:
-            logger.error(f"Collection configuration file not found at {self._collections}.")
+            logger.error(f"Collection configuration file not found at {self._collections_path}.")
         except yaml.scanner.ScannerError:
             logger.error(f"Bad YAML syntax in collection configuration file. Will attempt to reload collections "
                          f"after the next configuration change.")
 
     def _refresh(self):
         for collection in self._get_updated_collections():
-            self._collection_updated(collection)
+            self._collection_updated_callback(collection)
 
-        self._unschedule_watches()
+        self._observer.unschedule_all()
         self._schedule_watches()
 
     def _get_updated_collections(self) -> List[Collection]:
@@ -81,17 +78,12 @@ class CollectionWatcher:
         self._load_collections()
         return list(set(self.collections()) - set(old_collections))
 
-    def _unschedule_watches(self):
-        for directory, watch in self._watches.items():
-            self._observer.unschedule(watch)
-        self._watches.clear()
-
     def _schedule_watches(self):
-        for collection in self.collections():
-            granule_event_handler = _GranuleEventHandler(self._granule_updated, collection)
-            directory = collection.directory()
-            if directory not in self._watches:
-                self._watches[directory] = self._observer.schedule(granule_event_handler, directory)
+        for directory, collections in self._collections_by_dir.items():
+            granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+            # Note: the Watchdog library does not schedule a new watch
+            # if one is already scheduled for the same directory
+            self._observer.schedule(granule_event_handler, directory)
 
 
 class _CollectionEventHandler(FileSystemEventHandler):
@@ -114,11 +106,12 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
-    def __init__(self, granule_updated: Callable[[str, Collection], any], collection: Collection):
-        self._granule_updated = granule_updated
-        self._collection = collection
+    def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
+        self._callback = callback
+        self._collections_for_dir = collections_for_dir
 
     def on_created(self, event):
         super().on_created(event)
-        if self._collection.owns_file(event.src_path):
-            self._granule_updated(event.src_path, self._collection)
+        for collection in self._collections_for_dir:
+            if collection.owns_file(event.src_path):
+                self._callback(event.src_path, collection)
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 9fab784..0a92317 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -40,20 +40,17 @@ class FileIngestionHistory(IngestionHistory):
 
         self._latest_ingested_file_update_file_path = os.path.join(history_path, f'{dataset_id}.ts')
         if os.path.exists(self._latest_ingested_file_update_file_path):
-            logger.info(f"read latest ingested file update date from {self._latest_ingested_file_update_file_path}")
             with open(self._latest_ingested_file_update_file_path, 'r') as f_ts:
                 self._latest_ingested_file_update = float(f_ts.readline())
 
     def _load_history_dict(self):
-        logger.info(f"loading history file {self._history_file_path}")
         try:
             with open(self._history_file_path, 'r') as f_history:
                 for line in f_history:
                     filename, md5sum = line.strip().split(',')
-                    logger.info(f"add to history file {filename} with md5sum {md5sum}")
                     self._history_dict[filename] = md5sum
         except FileNotFoundError:
-            logger.info("no history file created yet")
+            logger.info("No history file created yet")
 
     def __del__(self):
         self._history_file.close()