You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/04/09 01:17:26 UTC
[incubator-sdap-ingester] 07/33: SDAP-269: Switch to using aio-pika
in collection-manager to maintain an asynchronous connection to RabbitMQ
(#7)
This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch ascending_latitudes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit d748429329084cb19966c80393acaa4e53162391
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Aug 3 12:51:56 2020 -0700
SDAP-269: Switch to using aio-pika in collection-manager to maintain an asynchronous connection to RabbitMQ (#7)
Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
.gitignore | 4 +-
collection_manager/README.md | 7 ++-
collection_manager/collection_manager/main.py | 36 ++++++------
.../services/CollectionProcessor.py | 8 +--
.../services/CollectionWatcher.py | 45 +++++++++------
.../services/MessagePublisher.py | 41 +++++++-------
collection_manager/requirements.txt | 3 +-
.../tests/services/test_CollectionProcessor.py | 37 +++++++-----
.../tests/services/test_CollectionWatcher.py | 66 ++++++++--------------
common/common/__init__.py | 0
common/common/async_test_utils/AsyncTestUtils.py | 28 +++++++++
common/common/async_test_utils/__init__.py | 1 +
common/setup.py | 21 +++++++
granule_ingester/.gitignore | 9 ---
granule_ingester/tests/writers/test_SolrStore.py | 5 +-
15 files changed, 178 insertions(+), 133 deletions(-)
diff --git a/.gitignore b/.gitignore
index e7c73b8..46e4151 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,7 +2,6 @@
credentials.json
token.pickle
tmp
-sdap_ingest_manager.egg-info
venv
__pycache__/
dist/
@@ -11,3 +10,6 @@ build/
*.DS_Store
.eggs
temp/
+*.pyc
+*.vscode
+*.code-workspace
\ No newline at end of file
diff --git a/collection_manager/README.md b/collection_manager/README.md
index 9d00cbb..771f355 100644
--- a/collection_manager/README.md
+++ b/collection_manager/README.md
@@ -64,10 +64,11 @@ collections:
```
## Running the tests
-From `incubator-sdap-ingester/collection_manager`, run:
+From `incubator-sdap-ingester/`, run:
- $ pip install pytest
- $ pytest
+ $ cd common && python setup.py install
+ $ cd ../collection_manager && python setup.py install
+ $ pip install pytest && pytest
## Building the Docker image
From `incubator-sdap-ingester/collection_manager`, run:
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 7e72de5..cbe22f9 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -63,28 +63,26 @@ async def main():
history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
else:
history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
- publisher = MessagePublisher(host=options.rabbitmq_host,
- username=options.rabbitmq_username,
- password=options.rabbitmq_password,
- queue=options.rabbitmq_queue)
- publisher.connect()
- collection_processor = CollectionProcessor(message_publisher=publisher,
- history_manager_builder=history_manager_builder)
- collection_watcher = CollectionWatcher(collections_path=options.collections_path,
- collection_updated_callback=collection_processor.process_collection,
- granule_updated_callback=collection_processor.process_granule,
- collections_refresh_interval=int(options.refresh))
+ async with MessagePublisher(host=options.rabbitmq_host,
+ username=options.rabbitmq_username,
+ password=options.rabbitmq_password,
+ queue=options.rabbitmq_queue) as publisher:
+ collection_processor = CollectionProcessor(message_publisher=publisher,
+ history_manager_builder=history_manager_builder)
+ collection_watcher = CollectionWatcher(collections_path=options.collections_path,
+ collection_updated_callback=collection_processor.process_collection,
+ granule_updated_callback=collection_processor.process_granule,
+ collections_refresh_interval=int(options.refresh))
- collection_watcher.start_watching()
-
- while True:
- try:
- await asyncio.sleep(1)
- except KeyboardInterrupt:
- return
+ await collection_watcher.start_watching()
+ while True:
+ try:
+ await asyncio.sleep(1)
+ except KeyboardInterrupt:
+ return
except Exception as e:
- logger.error(e)
+ logger.exception(e)
return
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 232cdee..d790f4b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -25,16 +25,16 @@ class CollectionProcessor:
with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
self._config_template = config_template_file.read()
- def process_collection(self, collection: Collection):
+ async def process_collection(self, collection: Collection):
"""
Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
:param collection: A Collection definition
:return: None
"""
for granule in collection.files_owned():
- self.process_granule(granule, collection)
+ await self.process_granule(granule, collection)
- def process_granule(self, granule: str, collection: Collection):
+ async def process_granule(self, granule: str, collection: Collection):
"""
Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
:param granule: A path to a granule file
@@ -64,7 +64,7 @@ class CollectionProcessor:
return
dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
- self._publisher.publish_message(body=dataset_config, priority=use_priority)
+ await self._publisher.publish_message(body=dataset_config, priority=use_priority)
history_manager.push(granule)
@staticmethod
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 2387016..8911806 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,9 +2,7 @@ import asyncio
import logging
import os
from collections import defaultdict
-from functools import partial
-from typing import Dict, Callable, Set, Optional
-
+from typing import Dict, Callable, Set, Optional, Awaitable
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@@ -22,8 +20,8 @@ logger.setLevel(logging.DEBUG)
class CollectionWatcher:
def __init__(self,
collections_path: str,
- collection_updated_callback: Callable[[Collection], any],
- granule_updated_callback: Callable[[str, Collection], any],
+ collection_updated_callback: Callable[[Collection], Awaitable],
+ granule_updated_callback: Callable[[str, Collection], Awaitable],
collections_refresh_interval: float = 30):
if not os.path.isabs(collections_path):
raise RelativePathError("Collections config path must be an absolute path.")
@@ -38,7 +36,7 @@ class CollectionWatcher:
self._granule_watches = set()
- def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
+ async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
"""
Periodically load the Collections Configuration file to check for changes,
and observe filesystem events for added/modified granules. When an event occurs,
@@ -46,7 +44,9 @@ class CollectionWatcher:
:return: None
"""
- self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+ await self._run_periodically(loop=loop,
+ wait_time=self._collections_refresh_interval,
+ func=self._reload_and_reschedule)
self._observer.start()
def collections(self) -> Set[Collection]:
@@ -99,11 +99,11 @@ class CollectionWatcher:
self._load_collections()
return self.collections() - old_collections
- def _reload_and_reschedule(self):
+ async def _reload_and_reschedule(self):
try:
updated_collections = self._get_updated_collections()
for collection in updated_collections:
- self._collection_updated_callback(collection)
+ await self._collection_updated_callback(collection)
if len(updated_collections) > 0:
self._unschedule_watches()
self._schedule_watches()
@@ -117,7 +117,9 @@ class CollectionWatcher:
def _schedule_watches(self):
for directory, collections in self._collections_by_dir.items():
- granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+ granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(),
+ self._granule_updated_callback,
+ collections)
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
try:
@@ -127,18 +129,23 @@ class CollectionWatcher:
logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
@classmethod
- def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+ async def _run_periodically(cls,
+ loop: Optional[asyncio.AbstractEventLoop],
+ wait_time: float,
+ func: Callable[[any], Awaitable],
+ *args,
+ **kwargs):
"""
Call a function periodically. This uses asyncio, and is non-blocking.
:param loop: An optional event loop to use. If None, the current running event loop will be used.
:param wait_time: seconds to wait between iterations of func
- :param func: the function that will be run
+ :param func: the async function that will be awaited
:param args: any args that need to be provided to func
"""
if loop is None:
loop = asyncio.get_running_loop()
- func(*args)
- loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
+ await func(*args, **kwargs)
+ loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs))
class _GranuleEventHandler(FileSystemEventHandler):
@@ -146,7 +153,11 @@ class _GranuleEventHandler(FileSystemEventHandler):
EventHandler that watches for new or modified granule files.
"""
- def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
+ def __init__(self,
+ loop: asyncio.AbstractEventLoop,
+ callback: Callable[[str, Collection], Awaitable],
+ collections_for_dir: Set[Collection]):
+ self._loop = loop
self._callback = callback
self._collections_for_dir = collections_for_dir
@@ -154,7 +165,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
super().on_created(event)
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
- self._callback(event.src_path, collection)
+ self._loop.create_task(self._callback(event.src_path, collection))
def on_modified(self, event):
super().on_modified(event)
@@ -163,4 +174,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
- self._callback(event.src_path, collection)
+ self._loop.create_task(self._callback(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 559a69d..75803d1 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -1,4 +1,5 @@
-import pika
+from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust
+from tenacity import retry, stop_after_attempt, wait_fixed
class MessagePublisher:
@@ -6,34 +7,34 @@ class MessagePublisher:
def __init__(self, host: str, username: str, password: str, queue: str):
self._connection_string = f"amqp://{username}:{password}@{host}/"
self._queue = queue
- self._channel = None
- self._connection = None
+ self._channel: Channel = None
+ self._connection: Connection = None
- def connect(self):
+ async def __aenter__(self):
+ await self._connect()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self._connection:
+ await self._connection.close()
+
+ async def _connect(self):
"""
Establish a connection to RabbitMQ.
:return: None
"""
- parameters = pika.URLParameters(self._connection_string)
- self._connection = pika.BlockingConnection(parameters)
- self._channel = self._connection.channel()
- self._channel.queue_declare(self._queue, durable=True)
+ self._connection = await connect_robust(self._connection_string)
+ self._channel = await self._connection.channel()
+ await self._channel.declare_queue(self._queue, durable=True)
- def publish_message(self, body: str, priority: int = None):
+ @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
+ async def publish_message(self, body: str, priority: int = None):
"""
Publish a message to RabbitMQ using the optional message priority.
:param body: A string to publish to RabbitMQ
:param priority: An optional integer priority to use for the message
:return: None
"""
- properties = pika.BasicProperties(content_type='text/plain',
- delivery_mode=1,
- priority=priority)
- self._channel.basic_publish(exchange='',
- routing_key=self._queue,
- body=body,
- properties=properties)
-
- def __del__(self):
- if self._connection:
- self._connection.close()
+ message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT)
+ await self._channel.default_exchange.publish(message, routing_key=self._queue)
+
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..7e47c51 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -1,6 +1,7 @@
PyYAML==5.3.1
pystache==0.5.4
pysolr==3.8.1
-pika==1.1.0
watchdog==0.10.2
requests==2.23.0
+aio-pika==6.6.1
+tenacity==6.2.0
\ No newline at end of file
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index 56d5393..aa143f3 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -6,6 +6,7 @@ from collection_manager.entities import Collection
from collection_manager.services import CollectionProcessor
from collection_manager.services.history_manager import FileIngestionHistoryBuilder
from collection_manager.services.history_manager import GranuleStatus
+from common.async_test_utils import AsyncMock, async_test
class TestCollectionProcessor(unittest.TestCase):
@@ -63,10 +64,11 @@ class TestCollectionProcessor(unittest.TestCase):
filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template)
self.assertEqual(filled, expected)
+ @async_test
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
- @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
- def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history):
+ @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+ async def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_HISTORICAL
mock_history_builder.build.return_value = mock_history
@@ -79,15 +81,17 @@ class TestCollectionProcessor(unittest.TestCase):
date_from=None,
date_to=None)
- collection_processor.process_granule("test.nc", collection)
+ await collection_processor.process_granule("test.nc", collection)
mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1)
mock_history.push.assert_called()
+ @async_test
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
- @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
- def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder, mock_history):
+ @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+ async def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder,
+ mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
mock_history_builder.build.return_value = mock_history
@@ -100,15 +104,16 @@ class TestCollectionProcessor(unittest.TestCase):
date_from=None,
date_to=None)
- collection_processor.process_granule("test.h5", collection)
+ await collection_processor.process_granule("test.h5", collection)
mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=2)
mock_history.push.assert_called()
+ @async_test
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
- @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
- def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
+ @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+ async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
mock_history_builder, mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
mock_history_builder.build.return_value = mock_history
@@ -121,15 +126,16 @@ class TestCollectionProcessor(unittest.TestCase):
date_from=None,
date_to=None)
- collection_processor.process_granule("test.h5", collection)
+ await collection_processor.process_granule("test.h5", collection)
mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1)
mock_history.push.assert_called()
+ @async_test
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
- @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
- def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history):
+ @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+ async def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED
mock_history_builder.build.return_value = mock_history
@@ -142,15 +148,16 @@ class TestCollectionProcessor(unittest.TestCase):
date_from=None,
date_to=None)
- collection_processor.process_granule("test.nc", collection)
+ await collection_processor.process_granule("test.nc", collection)
mock_publisher.publish_message.assert_not_called()
mock_history.push.assert_not_called()
+ @async_test
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
- @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
- def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history):
+ @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+ async def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history):
mock_history_builder.build.return_value = mock_history
collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
@@ -162,7 +169,7 @@ class TestCollectionProcessor(unittest.TestCase):
date_from=None,
date_to=None)
- collection_processor.process_granule("test.foo", collection)
+ await collection_processor.process_granule("test.foo", collection)
mock_publisher.publish_message.assert_not_called()
mock_history.push.assert_not_called()
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index b954812..14e7c3c 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -1,4 +1,3 @@
-import asyncio
import os
import tempfile
import unittest
@@ -9,6 +8,7 @@ from collection_manager.entities import Collection
from collection_manager.entities.exceptions import CollectionConfigParsingError, CollectionConfigFileNotFoundError, \
RelativePathCollectionError, ConflictingPathCollectionError
from collection_manager.services import CollectionWatcher
+from common.async_test_utils.AsyncTestUtils import AsyncAssert, AsyncMock, async_test
class TestCollectionWatcher(unittest.TestCase):
@@ -30,7 +30,7 @@ class TestCollectionWatcher(unittest.TestCase):
def test_load_collections_loads_all_collections(self):
collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml')
- collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
+ collection_watcher = CollectionWatcher(collections_path, AsyncMock(), AsyncMock())
collection_watcher._load_collections()
self.assertEqual(len(collection_watcher._collections_by_dir), 2)
@@ -120,7 +120,8 @@ class TestCollectionWatcher(unittest.TestCase):
date_to=None)
self.assertRaises(ConflictingPathCollectionError, collection_watcher._validate_collection, collection)
- def test_collection_callback_is_called(self):
+ @async_test
+ async def test_collection_callback_is_called(self):
collections_config = tempfile.NamedTemporaryFile("w+b", buffering=0, delete=False)
granule_dir = tempfile.TemporaryDirectory()
collections_str = f"""collections:
@@ -131,14 +132,13 @@ class TestCollectionWatcher(unittest.TestCase):
forward-processing-priority: 5"""
collections_config.write(collections_str.encode("utf-8"))
- collection_callback = Mock()
+ collection_callback = AsyncMock()
collection_watcher = CollectionWatcher(collections_path=collections_config.name,
collection_updated_callback=collection_callback,
- granule_updated_callback=Mock(),
+ granule_updated_callback=AsyncMock(),
collections_refresh_interval=0.1)
- loop = asyncio.new_event_loop()
- collection_watcher.start_watching(loop)
+ await collection_watcher.start_watching()
collections_str = f"""
- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
@@ -149,14 +149,14 @@ class TestCollectionWatcher(unittest.TestCase):
"""
collections_config.write(collections_str.encode("utf-8"))
- loop.run_until_complete(self.assert_called_within_timeout(collection_callback, call_count=2))
+ await AsyncAssert.assert_called_within_timeout(collection_callback, call_count=2)
- loop.close()
collections_config.close()
granule_dir.cleanup()
os.remove(collections_config.name)
- def test_granule_callback_is_called_on_new_file(self):
+ @async_test
+ async def test_granule_callback_is_called_on_new_file(self):
with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config:
granule_dir = tempfile.TemporaryDirectory()
collections_str = f"""
@@ -169,21 +169,18 @@ collections:
"""
collections_config.write(collections_str.encode("utf-8"))
- granule_callback = Mock()
- collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
-
- loop = asyncio.new_event_loop()
- collection_watcher.start_watching(loop)
+ granule_callback = AsyncMock()
+ collection_watcher = CollectionWatcher(collections_config.name, AsyncMock(), granule_callback)
+ await collection_watcher.start_watching()
new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
+ await AsyncAssert.assert_called_within_timeout(granule_callback)
- loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
-
- loop.close()
new_granule.close()
granule_dir.cleanup()
- def test_granule_callback_is_called_on_modified_file(self):
+ @async_test
+ async def test_granule_callback_is_called_on_modified_file(self):
with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config:
granule_dir = tempfile.TemporaryDirectory()
collections_str = f"""
@@ -197,33 +194,20 @@ collections:
collections_config.write(collections_str.encode("utf-8"))
new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
- granule_callback = Mock()
- collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
+ granule_callback = AsyncMock()
+ collection_watcher = CollectionWatcher(collections_config.name, AsyncMock(), granule_callback)
- loop = asyncio.new_event_loop()
- collection_watcher.start_watching(loop)
+ await collection_watcher.start_watching()
new_granule.write("hello world")
new_granule.close()
- loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
- loop.close()
+ await AsyncAssert.assert_called_within_timeout(granule_callback)
granule_dir.cleanup()
- def test_run_periodically(self):
- callback = Mock()
- loop = asyncio.new_event_loop()
- CollectionWatcher._run_periodically(loop, 0.1, callback)
- loop.run_until_complete(self.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2))
- loop.close()
-
- @staticmethod
- async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
- start = datetime.now()
-
- while (datetime.now() - start).total_seconds() < timeout_sec:
- await asyncio.sleep(0.01)
- if mock_func.call_count >= call_count:
- return
- raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec")
+ @async_test
+ async def test_run_periodically(self):
+ callback = AsyncMock()
+ await CollectionWatcher._run_periodically(None, 0.1, callback)
+ await AsyncAssert.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2)
diff --git a/common/common/__init__.py b/common/common/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/common/common/async_test_utils/AsyncTestUtils.py b/common/common/async_test_utils/AsyncTestUtils.py
new file mode 100644
index 0000000..ccb829b
--- /dev/null
+++ b/common/common/async_test_utils/AsyncTestUtils.py
@@ -0,0 +1,28 @@
+import asyncio
+from datetime import datetime
+from unittest import mock
+
+
+class AsyncMock(mock.MagicMock):
+ async def __call__(self, *args, **kwargs):
+ return super().__call__(*args, **kwargs)
+
+
+class AsyncAssert:
+ @staticmethod
+ async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
+ start = datetime.now()
+
+ while (datetime.now() - start).total_seconds() < timeout_sec:
+ await asyncio.sleep(0.01)
+ if mock_func.call_count >= call_count:
+ return
+ raise AssertionError(f"Mock did not reach {call_count} calls called within {timeout_sec} sec")
+
+
+def async_test(coro):
+ def wrapper(*args, **kwargs):
+ loop = asyncio.new_event_loop()
+ return loop.run_until_complete(coro(*args, **kwargs))
+
+ return wrapper
diff --git a/common/common/async_test_utils/__init__.py b/common/common/async_test_utils/__init__.py
new file mode 100644
index 0000000..12563af
--- /dev/null
+++ b/common/common/async_test_utils/__init__.py
@@ -0,0 +1 @@
+from .AsyncTestUtils import AsyncMock, AsyncAssert, async_test
diff --git a/common/setup.py b/common/setup.py
new file mode 100644
index 0000000..ed621c8
--- /dev/null
+++ b/common/setup.py
@@ -0,0 +1,21 @@
+import re
+
+import setuptools
+
+PACKAGE_NAME = "sdap_ingester_common"
+
+setuptools.setup(
+ name=PACKAGE_NAME,
+ author="Apache - SDAP",
+ author_email="dev@sdap.apache.org",
+ description="a module of common functions for the sdap ingester components",
+ url="https://github.com/apache/incubator-sdap-ingester",
+ packages=setuptools.find_packages(),
+ classifiers=[
+ "Programming Language :: Python :: 3",
+ "Operating System :: OS Independent",
+ "Development Status :: 4 - Beta",
+ ],
+ python_requires='>=3.7',
+ include_package_data=True
+)
diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore
deleted file mode 100644
index 5408b74..0000000
--- a/granule_ingester/.gitignore
+++ /dev/null
@@ -1,9 +0,0 @@
-.vscode
-.idea
-*.egg-info
-*__pycache__
-*.pytest_cache
-*.code-workspace
-.DS_STORE
-build
-dist
\ No newline at end of file
diff --git a/granule_ingester/tests/writers/test_SolrStore.py b/granule_ingester/tests/writers/test_SolrStore.py
index 76b85ac..0e971ce 100644
--- a/granule_ingester/tests/writers/test_SolrStore.py
+++ b/granule_ingester/tests/writers/test_SolrStore.py
@@ -1,4 +1,3 @@
-import asyncio
import unittest
from nexusproto import DataTile_pb2 as nexusproto
@@ -43,8 +42,8 @@ class TestSolrStore(unittest.TestCase):
self.assertEqual('test_variable', solr_doc['tile_var_name_s'])
self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon'])
self.assertAlmostEqual(90.0, solr_doc['tile_max_lon'])
- self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'])
- self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'])
+ self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'], delta=1E-5)
+ self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'], delta=1E-5)
self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt'])
self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt'])
self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d'])