You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/06 17:21:03 UTC
[incubator-sdap-ingester] 01/01: reload collections config on an
interval
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch collection-config-refresh
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 0c85575a24ec98b27a86fe42e5e3185202252f98
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 6 12:20:43 2020 -0500
reload collections config on an interval
---
collection_manager/collection_manager/main.py | 17 +++++---
.../services/CollectionWatcher.py | 50 +++++++++++-----------
collection_manager/docker/entrypoint.sh | 1 +
collection_manager/requirements.txt | 1 +
4 files changed, 38 insertions(+), 31 deletions(-)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index d8d2a5a..45d88fc 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,7 +1,7 @@
import argparse
+import asyncio
import logging
import os
-import time
from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher
from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
@@ -47,11 +47,15 @@ def get_args() -> argparse.Namespace:
default="nexus",
metavar="QUEUE",
help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
+ parser.add_argument('--refresh',
+ default='30',
+ metavar="INTERVAL",
+ help='Number of seconds after which to reload the collections config file. (Default: 30)')
return parser.parse_args()
-def main():
+async def main():
try:
options = get_args()
@@ -68,13 +72,14 @@ def main():
history_manager_builder=history_manager_builder)
collection_watcher = CollectionWatcher(collections_path=options.collections_path,
collection_updated_callback=collection_processor.process_collection,
- granule_updated_callback=collection_processor.process_granule)
+ granule_updated_callback=collection_processor.process_granule,
+ collections_refresh_interval=int(options.refresh))
- collection_watcher.start_watching()
+ await collection_watcher.start_watching()
while True:
try:
- time.sleep(1)
+ await asyncio.sleep(1)
except KeyboardInterrupt:
return
@@ -84,4 +89,4 @@ def main():
if __name__ == "__main__":
- main()
+ asyncio.run(main())
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index a3c3bf7..7a73d9b 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,7 +2,8 @@ import logging
import os
from collections import defaultdict
from typing import Dict, Callable, Set
-
+import schedule
+import asyncio
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@@ -21,30 +22,30 @@ class CollectionWatcher:
def __init__(self,
collections_path: str,
collection_updated_callback: Callable[[Collection], any],
- granule_updated_callback: Callable[[str, Collection], any]):
+ granule_updated_callback: Callable[[str, Collection], any],
+ collections_refresh_interval: int = 30):
if not os.path.isabs(collections_path):
raise RelativePathError("Collections config path must be an absolute path.")
self._collections_path = collections_path
self._collection_updated_callback = collection_updated_callback
self._granule_updated_callback = granule_updated_callback
+ self._collections_refresh_interval = collections_refresh_interval
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
self._observer = Observer()
self._granule_watches = set()
- def start_watching(self):
+ async def start_watching(self):
"""
Start observing filesystem events for added/modified granules or changes to the Collections configuration file.
When an event occurs, call the appropriate callback that was passed in during instantiation.
:return: None
"""
- self._observer.schedule(
- _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule),
- os.path.dirname(self._collections_path))
+
+ asyncio.create_task(run_periodically(self._collections_refresh_interval, self._reload_and_reschedule))
self._observer.start()
- self._reload_and_reschedule()
def collections(self) -> Set[Collection]:
"""
@@ -98,10 +99,12 @@ class CollectionWatcher:
def _reload_and_reschedule(self):
try:
- for collection in self._get_updated_collections():
+ updated_collections = self._get_updated_collections()
+ for collection in updated_collections:
self._collection_updated_callback(collection)
- self._unschedule_watches()
- self._schedule_watches()
+ if len(updated_collections) > 0:
+ self._unschedule_watches()
+ self._schedule_watches()
except CollectionConfigParsingError as e:
logger.error(e)
@@ -122,21 +125,6 @@ class CollectionWatcher:
logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
-class _CollectionEventHandler(FileSystemEventHandler):
- """
- EventHandler that watches for changes to the Collections config file.
- """
-
- def __init__(self, file_path: str, callback: Callable[[], any]):
- self._callback = callback
- self._file_path = file_path
-
- def on_modified(self, event):
- super().on_modified(event)
- if event.src_path == self._file_path:
- self._callback()
-
-
class _GranuleEventHandler(FileSystemEventHandler):
"""
EventHandler that watches for new or modified granule files.
@@ -160,3 +148,15 @@ class _GranuleEventHandler(FileSystemEventHandler):
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
self._callback(event.src_path, collection)
+
+
+async def run_periodically(wait_time: int, func: Callable, *args):
+ """
+ Wraps a function in a coroutine that will run the given function indefinitely
+ :param wait_time: seconds to wait between iterations of func
+ :param func: the function that will be run
+ :param args: any args that need to be provided to func
+ """
+ while True:
+ func(*args)
+ await asyncio.sleep(wait_time)
diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh
index eb88f75..988dd2c 100644
--- a/collection_manager/docker/entrypoint.sh
+++ b/collection_manager/docker/entrypoint.sh
@@ -8,3 +8,4 @@ python /collection_manager/collection_manager/main.py \
$([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
$([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \
$([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH)
+ $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH)
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..27501c9 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,3 +4,4 @@ pysolr==3.8.1
pika==1.1.0
watchdog==0.10.2
requests==2.23.0
+schedule==0.6.0
\ No newline at end of file