You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/06 17:21:03 UTC

[incubator-sdap-ingester] 01/01: reload collections config on an interval

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch collection-config-refresh
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 0c85575a24ec98b27a86fe42e5e3185202252f98
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 6 12:20:43 2020 -0500

    reload collections config on an interval
---
 collection_manager/collection_manager/main.py      | 17 +++++---
 .../services/CollectionWatcher.py                  | 50 +++++++++++-----------
 collection_manager/docker/entrypoint.sh            |  1 +
 collection_manager/requirements.txt                |  1 +
 4 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index d8d2a5a..45d88fc 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,7 +1,7 @@
 import argparse
+import asyncio
 import logging
 import os
-import time
 
 from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher
 from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
@@ -47,11 +47,15 @@ def get_args() -> argparse.Namespace:
                         default="nexus",
                         metavar="QUEUE",
                         help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
+    parser.add_argument('--refresh',
+                        default='30',
+                        metavar="INTERVAL",
+                        help='Number of seconds after which to reload the collections config file. (Default: 30)')
 
     return parser.parse_args()
 
 
-def main():
+async def main():
     try:
         options = get_args()
 
@@ -68,13 +72,14 @@ def main():
                                                    history_manager_builder=history_manager_builder)
         collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                collection_updated_callback=collection_processor.process_collection,
-                                               granule_updated_callback=collection_processor.process_granule)
+                                               granule_updated_callback=collection_processor.process_granule,
+                                               collections_refresh_interval=int(options.refresh))
 
-        collection_watcher.start_watching()
+        await collection_watcher.start_watching()
 
         while True:
             try:
-                time.sleep(1)
+                await asyncio.sleep(1)
             except KeyboardInterrupt:
                 return
 
@@ -84,4 +89,4 @@ def main():
 
 
 if __name__ == "__main__":
-    main()
+    asyncio.run(main())
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index a3c3bf7..7a73d9b 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,7 +2,8 @@ import logging
 import os
 from collections import defaultdict
 from typing import Dict, Callable, Set
-
+import schedule
+import asyncio
 import yaml
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
@@ -21,30 +22,30 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  collection_updated_callback: Callable[[Collection], any],
-                 granule_updated_callback: Callable[[str, Collection], any]):
+                 granule_updated_callback: Callable[[str, Collection], any],
+                 collections_refresh_interval: int = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
 
         self._collections_path = collections_path
         self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
+        self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
         self._observer = Observer()
 
         self._granule_watches = set()
 
-    def start_watching(self):
+    async def start_watching(self):
         """
         Start observing filesystem events for added/modified granules or changes to the Collections configuration file.
         When an event occurs, call the appropriate callback that was passed in during instantiation.
         :return: None
         """
-        self._observer.schedule(
-            _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule),
-            os.path.dirname(self._collections_path))
+
+        asyncio.create_task(run_periodically(self._collections_refresh_interval, self._reload_and_reschedule))
         self._observer.start()
-        self._reload_and_reschedule()
 
     def collections(self) -> Set[Collection]:
         """
@@ -98,10 +99,12 @@ class CollectionWatcher:
 
     def _reload_and_reschedule(self):
         try:
-            for collection in self._get_updated_collections():
+            updated_collections = self._get_updated_collections()
+            for collection in updated_collections:
                 self._collection_updated_callback(collection)
-            self._unschedule_watches()
-            self._schedule_watches()
+            if len(updated_collections) > 0:
+                self._unschedule_watches()
+                self._schedule_watches()
         except CollectionConfigParsingError as e:
             logger.error(e)
 
@@ -122,21 +125,6 @@ class CollectionWatcher:
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
 
 
-class _CollectionEventHandler(FileSystemEventHandler):
-    """
-    EventHandler that watches for changes to the Collections config file.
-    """
-
-    def __init__(self, file_path: str, callback: Callable[[], any]):
-        self._callback = callback
-        self._file_path = file_path
-
-    def on_modified(self, event):
-        super().on_modified(event)
-        if event.src_path == self._file_path:
-            self._callback()
-
-
 class _GranuleEventHandler(FileSystemEventHandler):
     """
     EventHandler that watches for new or modified granule files.
@@ -160,3 +148,15 @@ class _GranuleEventHandler(FileSystemEventHandler):
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
                 self._callback(event.src_path, collection)
+
+
+async def run_periodically(wait_time: int, func: Callable, *args):
+    """
+    Wraps a function in a coroutine that will run the given function indefinitely
+    :param wait_time: seconds to wait between iterations of func
+    :param func: the function that will be run
+    :param args: any args that need to be provided to func
+    """
+    while True:
+        func(*args)
+        await asyncio.sleep(wait_time)
diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh
index eb88f75..988dd2c 100644
--- a/collection_manager/docker/entrypoint.sh
+++ b/collection_manager/docker/entrypoint.sh
@@ -8,3 +8,4 @@ python /collection_manager/collection_manager/main.py \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \
   $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH)
+  $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH)
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..27501c9 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,3 +4,4 @@ pysolr==3.8.1
 pika==1.1.0
 watchdog==0.10.2
 requests==2.23.0
+schedule==0.6.0
\ No newline at end of file