You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 15:26:28 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-259: The Collection Manager now reloads the Collections Config file on an interval instead of watching for filesystem events (#5)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 09f53d5  SDAP-259: The Collection Manager now reloads the Collections Config file on an interval instead of watching for filesystem events (#5)
09f53d5 is described below

commit 09f53d596fa5df439726a0d192094cc1256be201
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Jul 9 10:26:22 2020 -0500

    SDAP-259: The Collection Manager now reloads the Collections Config file on an interval instead of watching for filesystem events (#5)
    
    Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
 collection_manager/collection_manager/main.py      | 15 ++++--
 .../services/CollectionWatcher.py                  | 56 ++++++++++++----------
 collection_manager/docker/entrypoint.sh            |  1 +
 .../tests/services/test_CollectionWatcher.py       | 44 ++++++++++++-----
 4 files changed, 73 insertions(+), 43 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index d8d2a5a..7e72de5 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,7 +1,7 @@
 import argparse
+import asyncio
 import logging
 import os
-import time
 
 from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher
 from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
@@ -47,11 +47,15 @@ def get_args() -> argparse.Namespace:
                         default="nexus",
                         metavar="QUEUE",
                         help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
+    parser.add_argument('--refresh',
+                        default='30',
+                        metavar="INTERVAL",
+                        help='Number of seconds after which to reload the collections config file. (Default: 30)')
 
     return parser.parse_args()
 
 
-def main():
+async def main():
     try:
         options = get_args()
 
@@ -68,13 +72,14 @@ def main():
                                                    history_manager_builder=history_manager_builder)
         collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                collection_updated_callback=collection_processor.process_collection,
-                                               granule_updated_callback=collection_processor.process_granule)
+                                               granule_updated_callback=collection_processor.process_granule,
+                                               collections_refresh_interval=int(options.refresh))
 
         collection_watcher.start_watching()
 
         while True:
             try:
-                time.sleep(1)
+                await asyncio.sleep(1)
             except KeyboardInterrupt:
                 return
 
@@ -84,4 +89,4 @@ def main():
 
 
 if __name__ == "__main__":
-    main()
+    asyncio.run(main())
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index a3c3bf7..2387016 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,7 +1,9 @@
+import asyncio
 import logging
 import os
 from collections import defaultdict
-from typing import Dict, Callable, Set
+from functools import partial
+from typing import Dict, Callable, Set, Optional
 
 import yaml
 from watchdog.events import FileSystemEventHandler
@@ -21,30 +23,31 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  collection_updated_callback: Callable[[Collection], any],
-                 granule_updated_callback: Callable[[str, Collection], any]):
+                 granule_updated_callback: Callable[[str, Collection], any],
+                 collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
 
         self._collections_path = collections_path
         self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
+        self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
         self._observer = Observer()
 
         self._granule_watches = set()
 
-    def start_watching(self):
+    def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
         """
-        Start observing filesystem events for added/modified granules or changes to the Collections configuration file.
-        When an event occurs, call the appropriate callback that was passed in during instantiation.
+        Periodically load the Collections Configuration file to check for changes,
+        and observe filesystem events for added/modified granules. When an event occurs,
+        call the appropriate callback that was passed in during instantiation.
         :return: None
         """
-        self._observer.schedule(
-            _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule),
-            os.path.dirname(self._collections_path))
+
+        self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
         self._observer.start()
-        self._reload_and_reschedule()
 
     def collections(self) -> Set[Collection]:
         """
@@ -98,10 +101,12 @@ class CollectionWatcher:
 
     def _reload_and_reschedule(self):
         try:
-            for collection in self._get_updated_collections():
+            updated_collections = self._get_updated_collections()
+            for collection in updated_collections:
                 self._collection_updated_callback(collection)
-            self._unschedule_watches()
-            self._schedule_watches()
+            if len(updated_collections) > 0:
+                self._unschedule_watches()
+                self._schedule_watches()
         except CollectionConfigParsingError as e:
             logger.error(e)
 
@@ -121,20 +126,19 @@ class CollectionWatcher:
                 bad_collection_names = ' and '.join([col.dataset_id for col in collections])
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
 
-
-class _CollectionEventHandler(FileSystemEventHandler):
-    """
-    EventHandler that watches for changes to the Collections config file.
-    """
-
-    def __init__(self, file_path: str, callback: Callable[[], any]):
-        self._callback = callback
-        self._file_path = file_path
-
-    def on_modified(self, event):
-        super().on_modified(event)
-        if event.src_path == self._file_path:
-            self._callback()
+    @classmethod
+    def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+        """
+        Call a function periodically. This uses asyncio, and is non-blocking.
+        :param loop: An optional event loop to use. If None, the current running event loop will be used.
+        :param wait_time: seconds to wait between iterations of func
+        :param func: the function that will be run
+        :param args: any args that need to be provided to func
+        """
+        if loop is None:
+            loop = asyncio.get_running_loop()
+        func(*args)
+        loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
 
 
 class _GranuleEventHandler(FileSystemEventHandler):
diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh
index eb88f75..988dd2c 100644
--- a/collection_manager/docker/entrypoint.sh
+++ b/collection_manager/docker/entrypoint.sh
@@ -8,3 +8,4 @@ python /collection_manager/collection_manager/main.py \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \
   $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH)
+  $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH)
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index 8c6ab5f..b954812 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -1,6 +1,6 @@
+import asyncio
 import os
 import tempfile
-import time
 import unittest
 from datetime import datetime
 from unittest.mock import Mock
@@ -132,8 +132,13 @@ class TestCollectionWatcher(unittest.TestCase):
         collections_config.write(collections_str.encode("utf-8"))
 
         collection_callback = Mock()
-        collection_watcher = CollectionWatcher(collections_config.name, collection_callback, Mock())
-        collection_watcher.start_watching()
+        collection_watcher = CollectionWatcher(collections_path=collections_config.name,
+                                               collection_updated_callback=collection_callback,
+                                               granule_updated_callback=Mock(),
+                                               collections_refresh_interval=0.1)
+
+        loop = asyncio.new_event_loop()
+        collection_watcher.start_watching(loop)
 
         collections_str = f"""
 - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
@@ -143,9 +148,11 @@ class TestCollectionWatcher(unittest.TestCase):
   forward-processing-priority: 5
         """
         collections_config.write(collections_str.encode("utf-8"))
-        collections_config.close()
 
-        self.assert_called_within_timeout(collection_callback, timeout_sec=1, call_count=2)
+        loop.run_until_complete(self.assert_called_within_timeout(collection_callback, call_count=2))
+
+        loop.close()
+        collections_config.close()
         granule_dir.cleanup()
         os.remove(collections_config.name)
 
@@ -164,12 +171,15 @@ collections:
 
             granule_callback = Mock()
             collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
-            collection_watcher.start_watching()
+
+            loop = asyncio.new_event_loop()
+            collection_watcher.start_watching(loop)
 
             new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
 
-            self.assert_called_within_timeout(granule_callback)
+            loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
 
+            loop.close()
             new_granule.close()
             granule_dir.cleanup()
 
@@ -189,21 +199,31 @@ collections:
 
             granule_callback = Mock()
             collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
-            collection_watcher.start_watching()
+
+            loop = asyncio.new_event_loop()
+            collection_watcher.start_watching(loop)
 
             new_granule.write("hello world")
             new_granule.close()
 
-            self.assert_called_within_timeout(granule_callback)
-
+            loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
+            loop.close()
             granule_dir.cleanup()
 
+    def test_run_periodically(self):
+        callback = Mock()
+        loop = asyncio.new_event_loop()
+        CollectionWatcher._run_periodically(loop, 0.1, callback)
+        loop.run_until_complete(self.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2))
+        loop.close()
+
     @staticmethod
-    def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
+    async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
         start = datetime.now()
 
         while (datetime.now() - start).total_seconds() < timeout_sec:
-            time.sleep(0.01)
+            await asyncio.sleep(0.01)
             if mock_func.call_count >= call_count:
                 return
         raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec")
+