You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/09 15:26:28 UTC
[incubator-sdap-ingester] branch dev updated: SDAP-259: The
Collection Manager now reloads the Collections Config file on an interval
instead of watching for filesystem events (#5)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 09f53d5 SDAP-259: The Collection Manager now reloads the Collections Config file on an interval instead of watching for filesystem events (#5)
09f53d5 is described below
commit 09f53d596fa5df439726a0d192094cc1256be201
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Jul 9 10:26:22 2020 -0500
SDAP-259: The Collection Manager now reloads the Collections Config file on an interval instead of watching for filesystem events (#5)
Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
collection_manager/collection_manager/main.py | 15 ++++--
.../services/CollectionWatcher.py | 56 ++++++++++++----------
collection_manager/docker/entrypoint.sh | 1 +
.../tests/services/test_CollectionWatcher.py | 44 ++++++++++++-----
4 files changed, 73 insertions(+), 43 deletions(-)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index d8d2a5a..7e72de5 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,7 +1,7 @@
import argparse
+import asyncio
import logging
import os
-import time
from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher
from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
@@ -47,11 +47,15 @@ def get_args() -> argparse.Namespace:
default="nexus",
metavar="QUEUE",
help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
+ parser.add_argument('--refresh',
+ default='30',
+ metavar="INTERVAL",
+ help='Number of seconds after which to reload the collections config file. (Default: 30)')
return parser.parse_args()
-def main():
+async def main():
try:
options = get_args()
@@ -68,13 +72,14 @@ def main():
history_manager_builder=history_manager_builder)
collection_watcher = CollectionWatcher(collections_path=options.collections_path,
collection_updated_callback=collection_processor.process_collection,
- granule_updated_callback=collection_processor.process_granule)
+ granule_updated_callback=collection_processor.process_granule,
+ collections_refresh_interval=int(options.refresh))
collection_watcher.start_watching()
while True:
try:
- time.sleep(1)
+ await asyncio.sleep(1)
except KeyboardInterrupt:
return
@@ -84,4 +89,4 @@ def main():
if __name__ == "__main__":
- main()
+ asyncio.run(main())
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index a3c3bf7..2387016 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,7 +1,9 @@
+import asyncio
import logging
import os
from collections import defaultdict
-from typing import Dict, Callable, Set
+from functools import partial
+from typing import Dict, Callable, Set, Optional
import yaml
from watchdog.events import FileSystemEventHandler
@@ -21,30 +23,31 @@ class CollectionWatcher:
def __init__(self,
collections_path: str,
collection_updated_callback: Callable[[Collection], any],
- granule_updated_callback: Callable[[str, Collection], any]):
+ granule_updated_callback: Callable[[str, Collection], any],
+ collections_refresh_interval: float = 30):
if not os.path.isabs(collections_path):
raise RelativePathError("Collections config path must be an absolute path.")
self._collections_path = collections_path
self._collection_updated_callback = collection_updated_callback
self._granule_updated_callback = granule_updated_callback
+ self._collections_refresh_interval = collections_refresh_interval
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
self._observer = Observer()
self._granule_watches = set()
- def start_watching(self):
+ def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
"""
- Start observing filesystem events for added/modified granules or changes to the Collections configuration file.
- When an event occurs, call the appropriate callback that was passed in during instantiation.
+ Periodically load the Collections Configuration file to check for changes,
+ and observe filesystem events for added/modified granules. When an event occurs,
+ call the appropriate callback that was passed in during instantiation.
:return: None
"""
- self._observer.schedule(
- _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule),
- os.path.dirname(self._collections_path))
+
+ self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
self._observer.start()
- self._reload_and_reschedule()
def collections(self) -> Set[Collection]:
"""
@@ -98,10 +101,12 @@ class CollectionWatcher:
def _reload_and_reschedule(self):
try:
- for collection in self._get_updated_collections():
+ updated_collections = self._get_updated_collections()
+ for collection in updated_collections:
self._collection_updated_callback(collection)
- self._unschedule_watches()
- self._schedule_watches()
+ if len(updated_collections) > 0:
+ self._unschedule_watches()
+ self._schedule_watches()
except CollectionConfigParsingError as e:
logger.error(e)
@@ -121,20 +126,19 @@ class CollectionWatcher:
bad_collection_names = ' and '.join([col.dataset_id for col in collections])
logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
-
-class _CollectionEventHandler(FileSystemEventHandler):
- """
- EventHandler that watches for changes to the Collections config file.
- """
-
- def __init__(self, file_path: str, callback: Callable[[], any]):
- self._callback = callback
- self._file_path = file_path
-
- def on_modified(self, event):
- super().on_modified(event)
- if event.src_path == self._file_path:
- self._callback()
+ @classmethod
+ def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+ """
+ Call a function periodically. This uses asyncio, and is non-blocking.
+ :param loop: An optional event loop to use. If None, the current running event loop will be used.
+ :param wait_time: seconds to wait between iterations of func
+ :param func: the function that will be run
+ :param args: any args that need to be provided to func
+ """
+ if loop is None:
+ loop = asyncio.get_running_loop()
+ func(*args)
+ loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
class _GranuleEventHandler(FileSystemEventHandler):
diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh
index eb88f75..988dd2c 100644
--- a/collection_manager/docker/entrypoint.sh
+++ b/collection_manager/docker/entrypoint.sh
@@ -8,3 +8,4 @@ python /collection_manager/collection_manager/main.py \
$([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
$([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \
$([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH)
+ $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH)
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index 8c6ab5f..b954812 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -1,6 +1,6 @@
+import asyncio
import os
import tempfile
-import time
import unittest
from datetime import datetime
from unittest.mock import Mock
@@ -132,8 +132,13 @@ class TestCollectionWatcher(unittest.TestCase):
collections_config.write(collections_str.encode("utf-8"))
collection_callback = Mock()
- collection_watcher = CollectionWatcher(collections_config.name, collection_callback, Mock())
- collection_watcher.start_watching()
+ collection_watcher = CollectionWatcher(collections_path=collections_config.name,
+ collection_updated_callback=collection_callback,
+ granule_updated_callback=Mock(),
+ collections_refresh_interval=0.1)
+
+ loop = asyncio.new_event_loop()
+ collection_watcher.start_watching(loop)
collections_str = f"""
- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
@@ -143,9 +148,11 @@ class TestCollectionWatcher(unittest.TestCase):
forward-processing-priority: 5
"""
collections_config.write(collections_str.encode("utf-8"))
- collections_config.close()
- self.assert_called_within_timeout(collection_callback, timeout_sec=1, call_count=2)
+ loop.run_until_complete(self.assert_called_within_timeout(collection_callback, call_count=2))
+
+ loop.close()
+ collections_config.close()
granule_dir.cleanup()
os.remove(collections_config.name)
@@ -164,12 +171,15 @@ collections:
granule_callback = Mock()
collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
- collection_watcher.start_watching()
+
+ loop = asyncio.new_event_loop()
+ collection_watcher.start_watching(loop)
new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
- self.assert_called_within_timeout(granule_callback)
+ loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
+ loop.close()
new_granule.close()
granule_dir.cleanup()
@@ -189,21 +199,31 @@ collections:
granule_callback = Mock()
collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
- collection_watcher.start_watching()
+
+ loop = asyncio.new_event_loop()
+ collection_watcher.start_watching(loop)
new_granule.write("hello world")
new_granule.close()
- self.assert_called_within_timeout(granule_callback)
-
+ loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
+ loop.close()
granule_dir.cleanup()
+ def test_run_periodically(self):
+ callback = Mock()
+ loop = asyncio.new_event_loop()
+ CollectionWatcher._run_periodically(loop, 0.1, callback)
+ loop.run_until_complete(self.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2))
+ loop.close()
+
@staticmethod
- def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
+ async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
start = datetime.now()
while (datetime.now() - start).total_seconds() < timeout_sec:
- time.sleep(0.01)
+ await asyncio.sleep(0.01)
if mock_func.call_count >= call_count:
return
raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec")
+