You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/10 20:38:54 UTC
[incubator-sdap-ingester] 01/01: allow multiple collections to use
the same directory
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 9054094b1e362a4f159b213bc5d5aab74becec32
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jun 9 17:02:55 2020 -0700
allow multiple collections to use the same directory
---
.../services/CollectionProcessor.py | 10 ++--
.../services/CollectionWatcher.py | 59 ++++++++++------------
.../history_manager/FileIngestionHistory.py | 5 +-
3 files changed, 33 insertions(+), 41 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 40da416..75a86e2 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -48,14 +48,16 @@ class CollectionProcessor:
granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
- logger.info(f"New granule '{granule}' detected for forward-processing ingestion.")
+ logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
+ f"in collection '{collection.dataset_id}'.")
use_priority = collection.forward_processing_priority
elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
- logger.info(f"New granule '{granule}' detected for historical ingestion.")
+ logger.info(f"New granule '{granule}' detected for historical ingestion in collection "
+ f"'{collection.dataset_id}'.")
use_priority = collection.historical_priority
else:
- logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired time range. "
- f"Skipping.")
+ logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+ f"time range for collection '{collection.dataset_id}'. Skipping.")
return
dataset_config = self._fill_template(collection, config_template=self._config_template)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 912ddad..b1fca64 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,6 +1,7 @@
import logging
import os
-from typing import List, Dict, Callable
+from collections import defaultdict
+from typing import List, Dict, Callable, Set
import yaml
from watchdog.events import FileSystemEventHandler
@@ -14,15 +15,16 @@ logger.setLevel(logging.DEBUG)
class CollectionWatcher:
- def __init__(self, collections_path: str,
+ def __init__(self,
+ collections_path: str,
collection_updated_callback: Callable[[Collection], any],
granule_updated_callback: Callable[[str, Collection], any]):
self._collections_path = collections_path
- self._collection_updated = collection_updated_callback
- self._granule_updated = granule_updated_callback
+ self._collection_updated_callback = collection_updated_callback
+ self._granule_updated_callback = granule_updated_callback
+
+ self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
self._observer = Observer()
- self._watches = {}
- self._collections: Dict[str, Collection] = {}
def start_watching(self):
"""
@@ -32,21 +34,21 @@ class CollectionWatcher:
"""
self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path, callback=self._refresh),
os.path.dirname(self._collections_path))
- self._refresh()
self._observer.start()
+ self._refresh()
def collections(self) -> List[Collection]:
"""
Return a list of all Collections being watched.
:return: A list of Collections
"""
- return list(self._collections.values())
+ return [collection for collections in self._collections_by_dir.values() for collection in collections]
def _load_collections(self):
try:
with open(self._collections_path, 'r') as f:
collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
- new_collections = {}
+ self._collections_by_dir.clear()
for _, collection_dict in collections_yaml.items():
collection = Collection.from_dict(collection_dict)
directory = collection.directory()
@@ -55,25 +57,20 @@ class CollectionWatcher:
f"which is the same directory as the collection configuration file, "
f"{self._collections_path}. The granules need to be in their own directory. "
f"Ignoring collection {collection.dataset_id} for now.")
- if directory in new_collections:
- logger.error(f"Ingestion order {collection.dataset_id} uses granule directory {directory} "
- f"which conflicts with ingestion order {new_collections[directory].dataset_id}."
- f" Ignoring {collection.dataset_id}.")
else:
- new_collections[directory] = collection
+ self._collections_by_dir[directory].add(collection)
- self._collections = new_collections
except FileNotFoundError:
- logger.error(f"Collection configuration file not found at {self._collections}.")
+ logger.error(f"Collection configuration file not found at {self._collections_path}.")
except yaml.scanner.ScannerError:
logger.error(f"Bad YAML syntax in collection configuration file. Will attempt to reload collections "
f"after the next configuration change.")
def _refresh(self):
for collection in self._get_updated_collections():
- self._collection_updated(collection)
+ self._collection_updated_callback(collection)
- self._unschedule_watches()
+ self._observer.unschedule_all()
self._schedule_watches()
def _get_updated_collections(self) -> List[Collection]:
@@ -81,17 +78,12 @@ class CollectionWatcher:
self._load_collections()
return list(set(self.collections()) - set(old_collections))
- def _unschedule_watches(self):
- for directory, watch in self._watches.items():
- self._observer.unschedule(watch)
- self._watches.clear()
-
def _schedule_watches(self):
- for collection in self.collections():
- granule_event_handler = _GranuleEventHandler(self._granule_updated, collection)
- directory = collection.directory()
- if directory not in self._watches:
- self._watches[directory] = self._observer.schedule(granule_event_handler, directory)
+ for directory, collections in self._collections_by_dir.items():
+ granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+ # Note: the Watchdog library does not schedule a new watch
+ # if one is already scheduled for the same directory
+ self._observer.schedule(granule_event_handler, directory)
class _CollectionEventHandler(FileSystemEventHandler):
@@ -114,11 +106,12 @@ class _GranuleEventHandler(FileSystemEventHandler):
EventHandler that watches for new or modified granule files.
"""
- def __init__(self, granule_updated: Callable[[str, Collection], any], collection: Collection):
- self._granule_updated = granule_updated
- self._collection = collection
+ def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
+ self._callback = callback
+ self._collections_for_dir = collections_for_dir
def on_created(self, event):
super().on_created(event)
- if self._collection.owns_file(event.src_path):
- self._granule_updated(event.src_path, self._collection)
+ for collection in self._collections_for_dir:
+ if collection.owns_file(event.src_path):
+ self._callback(event.src_path, collection)
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 9fab784..0a92317 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -40,20 +40,17 @@ class FileIngestionHistory(IngestionHistory):
self._latest_ingested_file_update_file_path = os.path.join(history_path, f'{dataset_id}.ts')
if os.path.exists(self._latest_ingested_file_update_file_path):
- logger.info(f"read latest ingested file update date from {self._latest_ingested_file_update_file_path}")
with open(self._latest_ingested_file_update_file_path, 'r') as f_ts:
self._latest_ingested_file_update = float(f_ts.readline())
def _load_history_dict(self):
- logger.info(f"loading history file {self._history_file_path}")
try:
with open(self._history_file_path, 'r') as f_history:
for line in f_history:
filename, md5sum = line.strip().split(',')
- logger.info(f"add to history file {filename} with md5sum {md5sum}")
self._history_dict[filename] = md5sum
except FileNotFoundError:
- logger.info("no history file created yet")
+ logger.info("No history file created yet")
def __del__(self):
self._history_file.close()