You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/04/09 01:17:26 UTC

[incubator-sdap-ingester] 07/33: SDAP-269: Switch to using aio-pika in collection-manager to maintain an asynchronous connection to RabbitMQ (#7)

This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch ascending_latitudes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit d748429329084cb19966c80393acaa4e53162391
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Aug 3 12:51:56 2020 -0700

    SDAP-269: Switch to using aio-pika in collection-manager to maintain an asynchronous connection to RabbitMQ (#7)
    
    Co-authored-by: Eamon Ford <ea...@jpl.nasa.gov>
---
 .gitignore                                         |  4 +-
 collection_manager/README.md                       |  7 ++-
 collection_manager/collection_manager/main.py      | 36 ++++++------
 .../services/CollectionProcessor.py                |  8 +--
 .../services/CollectionWatcher.py                  | 45 +++++++++------
 .../services/MessagePublisher.py                   | 41 +++++++-------
 collection_manager/requirements.txt                |  3 +-
 .../tests/services/test_CollectionProcessor.py     | 37 +++++++-----
 .../tests/services/test_CollectionWatcher.py       | 66 ++++++++--------------
 common/common/__init__.py                          |  0
 common/common/async_test_utils/AsyncTestUtils.py   | 28 +++++++++
 common/common/async_test_utils/__init__.py         |  1 +
 common/setup.py                                    | 21 +++++++
 granule_ingester/.gitignore                        |  9 ---
 granule_ingester/tests/writers/test_SolrStore.py   |  5 +-
 15 files changed, 178 insertions(+), 133 deletions(-)

diff --git a/.gitignore b/.gitignore
index e7c73b8..46e4151 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,7 +2,6 @@
 credentials.json
 token.pickle
 tmp
-sdap_ingest_manager.egg-info
 venv
 __pycache__/
 dist/
@@ -11,3 +10,6 @@ build/
 *.DS_Store
 .eggs
 temp/
+*.pyc
+*.vscode
+*.code-workspace
\ No newline at end of file
diff --git a/collection_manager/README.md b/collection_manager/README.md
index 9d00cbb..771f355 100644
--- a/collection_manager/README.md
+++ b/collection_manager/README.md
@@ -64,10 +64,11 @@ collections:
 
 ```
 ## Running the tests
-From `incubator-sdap-ingester/collection_manager`, run:
+From `incubator-sdap-ingester/`, run:
 
-    $ pip install pytest
-    $ pytest
+    $ cd common && python setup.py install
+    $ cd ../collection_manager && python setup.py install
+    $ pip install pytest && pytest
     
 ## Building the Docker image
 From `incubator-sdap-ingester/collection_manager`, run:
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 7e72de5..cbe22f9 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -63,28 +63,26 @@ async def main():
             history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
         else:
             history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
-        publisher = MessagePublisher(host=options.rabbitmq_host,
-                                     username=options.rabbitmq_username,
-                                     password=options.rabbitmq_password,
-                                     queue=options.rabbitmq_queue)
-        publisher.connect()
-        collection_processor = CollectionProcessor(message_publisher=publisher,
-                                                   history_manager_builder=history_manager_builder)
-        collection_watcher = CollectionWatcher(collections_path=options.collections_path,
-                                               collection_updated_callback=collection_processor.process_collection,
-                                               granule_updated_callback=collection_processor.process_granule,
-                                               collections_refresh_interval=int(options.refresh))
+        async with MessagePublisher(host=options.rabbitmq_host,
+                                    username=options.rabbitmq_username,
+                                    password=options.rabbitmq_password,
+                                    queue=options.rabbitmq_queue) as publisher:
+            collection_processor = CollectionProcessor(message_publisher=publisher,
+                                                       history_manager_builder=history_manager_builder)
+            collection_watcher = CollectionWatcher(collections_path=options.collections_path,
+                                                   collection_updated_callback=collection_processor.process_collection,
+                                                   granule_updated_callback=collection_processor.process_granule,
+                                                   collections_refresh_interval=int(options.refresh))
 
-        collection_watcher.start_watching()
-
-        while True:
-            try:
-                await asyncio.sleep(1)
-            except KeyboardInterrupt:
-                return
+            await collection_watcher.start_watching()
+            while True:
+                try:
+                    await asyncio.sleep(1)
+                except KeyboardInterrupt:
+                    return
 
     except Exception as e:
-        logger.error(e)
+        logger.exception(e)
         return
 
 
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 232cdee..d790f4b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -25,16 +25,16 @@ class CollectionProcessor:
         with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
             self._config_template = config_template_file.read()
 
-    def process_collection(self, collection: Collection):
+    async def process_collection(self, collection: Collection):
         """
         Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
         :param collection: A Collection definition
         :return: None
         """
         for granule in collection.files_owned():
-            self.process_granule(granule, collection)
+            await self.process_granule(granule, collection)
 
-    def process_granule(self, granule: str, collection: Collection):
+    async def process_granule(self, granule: str, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -64,7 +64,7 @@ class CollectionProcessor:
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
-        self._publisher.publish_message(body=dataset_config, priority=use_priority)
+        await self._publisher.publish_message(body=dataset_config, priority=use_priority)
         history_manager.push(granule)
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 2387016..8911806 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,9 +2,7 @@ import asyncio
 import logging
 import os
 from collections import defaultdict
-from functools import partial
-from typing import Dict, Callable, Set, Optional
-
+from typing import Dict, Callable, Set, Optional, Awaitable
 import yaml
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
@@ -22,8 +20,8 @@ logger.setLevel(logging.DEBUG)
 class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
-                 collection_updated_callback: Callable[[Collection], any],
-                 granule_updated_callback: Callable[[str, Collection], any],
+                 collection_updated_callback: Callable[[Collection], Awaitable],
+                 granule_updated_callback: Callable[[str, Collection], Awaitable],
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -38,7 +36,7 @@ class CollectionWatcher:
 
         self._granule_watches = set()
 
-    def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
+    async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
         """
         Periodically load the Collections Configuration file to check for changes,
         and observe filesystem events for added/modified granules. When an event occurs,
@@ -46,7 +44,9 @@ class CollectionWatcher:
         :return: None
         """
 
-        self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+        await self._run_periodically(loop=loop,
+                                     wait_time=self._collections_refresh_interval,
+                                     func=self._reload_and_reschedule)
         self._observer.start()
 
     def collections(self) -> Set[Collection]:
@@ -99,11 +99,11 @@ class CollectionWatcher:
         self._load_collections()
         return self.collections() - old_collections
 
-    def _reload_and_reschedule(self):
+    async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             for collection in updated_collections:
-                self._collection_updated_callback(collection)
+                await self._collection_updated_callback(collection)
             if len(updated_collections) > 0:
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -117,7 +117,9 @@ class CollectionWatcher:
 
     def _schedule_watches(self):
         for directory, collections in self._collections_by_dir.items():
-            granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+            granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(),
+                                                         self._granule_updated_callback,
+                                                         collections)
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
@@ -127,18 +129,23 @@ class CollectionWatcher:
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
 
     @classmethod
-    def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                func: Callable[[any], Awaitable],
+                                *args,
+                                **kwargs):
         """
         Call a function periodically. This uses asyncio, and is non-blocking.
         :param loop: An optional event loop to use. If None, the current running event loop will be used.
         :param wait_time: seconds to wait between iterations of func
-        :param func: the function that will be run
+        :param func: the async function that will be awaited
         :param args: any args that need to be provided to func
         """
         if loop is None:
             loop = asyncio.get_running_loop()
-        func(*args)
-        loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
+        await func(*args, **kwargs)
+        loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs))
 
 
 class _GranuleEventHandler(FileSystemEventHandler):
@@ -146,7 +153,11 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
-    def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
+    def __init__(self,
+                 loop: asyncio.AbstractEventLoop,
+                 callback: Callable[[str, Collection], Awaitable],
+                 collections_for_dir: Set[Collection]):
+        self._loop = loop
         self._callback = callback
         self._collections_for_dir = collections_for_dir
 
@@ -154,7 +165,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
         super().on_created(event)
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-                self._callback(event.src_path, collection)
+                self._loop.create_task(self._callback(event.src_path, collection))
 
     def on_modified(self, event):
         super().on_modified(event)
@@ -163,4 +174,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-                self._callback(event.src_path, collection)
+                self._loop.create_task(self._callback(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 559a69d..75803d1 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -1,4 +1,5 @@
-import pika
+from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust
+from tenacity import retry, stop_after_attempt, wait_fixed
 
 
 class MessagePublisher:
@@ -6,34 +7,34 @@ class MessagePublisher:
     def __init__(self, host: str, username: str, password: str, queue: str):
         self._connection_string = f"amqp://{username}:{password}@{host}/"
         self._queue = queue
-        self._channel = None
-        self._connection = None
+        self._channel: Channel = None
+        self._connection: Connection = None
 
-    def connect(self):
+    async def __aenter__(self):
+        await self._connect()
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        if self._connection:
+            await self._connection.close()
+
+    async def _connect(self):
         """
         Establish a connection to RabbitMQ.
         :return: None
         """
-        parameters = pika.URLParameters(self._connection_string)
-        self._connection = pika.BlockingConnection(parameters)
-        self._channel = self._connection.channel()
-        self._channel.queue_declare(self._queue, durable=True)
+        self._connection = await connect_robust(self._connection_string)
+        self._channel = await self._connection.channel()
+        await self._channel.declare_queue(self._queue, durable=True)
 
-    def publish_message(self, body: str, priority: int = None):
+    @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
+    async def publish_message(self, body: str, priority: int = None):
         """
         Publish a message to RabbitMQ using the optional message priority.
         :param body: A string to publish to RabbitMQ
         :param priority: An optional integer priority to use for the message
         :return: None
         """
-        properties = pika.BasicProperties(content_type='text/plain',
-                                          delivery_mode=1,
-                                          priority=priority)
-        self._channel.basic_publish(exchange='',
-                                    routing_key=self._queue,
-                                    body=body,
-                                    properties=properties)
-
-    def __del__(self):
-        if self._connection:
-            self._connection.close()
+        message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT)
+        await self._channel.default_exchange.publish(message, routing_key=self._queue)
+
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..7e47c51 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -1,6 +1,7 @@
 PyYAML==5.3.1
 pystache==0.5.4
 pysolr==3.8.1
-pika==1.1.0
 watchdog==0.10.2
 requests==2.23.0
+aio-pika==6.6.1
+tenacity==6.2.0
\ No newline at end of file
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index 56d5393..aa143f3 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -6,6 +6,7 @@ from collection_manager.entities import Collection
 from collection_manager.services import CollectionProcessor
 from collection_manager.services.history_manager import FileIngestionHistoryBuilder
 from collection_manager.services.history_manager import GranuleStatus
+from common.async_test_utils import AsyncMock, async_test
 
 
 class TestCollectionProcessor(unittest.TestCase):
@@ -63,10 +64,11 @@ class TestCollectionProcessor(unittest.TestCase):
         filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template)
         self.assertEqual(filled, expected)
 
+    @async_test
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
-    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
-    def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history):
+    @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+    async def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_HISTORICAL
         mock_history_builder.build.return_value = mock_history
 
@@ -79,15 +81,17 @@ class TestCollectionProcessor(unittest.TestCase):
                                 date_from=None,
                                 date_to=None)
 
-        collection_processor.process_granule("test.nc", collection)
+        await collection_processor.process_granule("test.nc", collection)
 
         mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1)
         mock_history.push.assert_called()
 
+    @async_test
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
-    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
-    def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder, mock_history):
+    @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+    async def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder,
+                                                                   mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
         mock_history_builder.build.return_value = mock_history
 
@@ -100,15 +104,16 @@ class TestCollectionProcessor(unittest.TestCase):
                                 date_from=None,
                                 date_to=None)
 
-        collection_processor.process_granule("test.h5", collection)
+        await collection_processor.process_granule("test.h5", collection)
 
         mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=2)
         mock_history.push.assert_called()
 
+    @async_test
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
-    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
-    def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
+    @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+    async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
                                                                              mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
         mock_history_builder.build.return_value = mock_history
@@ -121,15 +126,16 @@ class TestCollectionProcessor(unittest.TestCase):
                                 date_from=None,
                                 date_to=None)
 
-        collection_processor.process_granule("test.h5", collection)
+        await collection_processor.process_granule("test.h5", collection)
 
         mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1)
         mock_history.push.assert_called()
 
+    @async_test
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
-    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
-    def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history):
+    @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+    async def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED
         mock_history_builder.build.return_value = mock_history
 
@@ -142,15 +148,16 @@ class TestCollectionProcessor(unittest.TestCase):
                                 date_from=None,
                                 date_to=None)
 
-        collection_processor.process_granule("test.nc", collection)
+        await collection_processor.process_granule("test.nc", collection)
 
         mock_publisher.publish_message.assert_not_called()
         mock_history.push.assert_not_called()
 
+    @async_test
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
-    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
-    def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history):
+    @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
+    async def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history):
         mock_history_builder.build.return_value = mock_history
 
         collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
@@ -162,7 +169,7 @@ class TestCollectionProcessor(unittest.TestCase):
                                 date_from=None,
                                 date_to=None)
 
-        collection_processor.process_granule("test.foo", collection)
+        await collection_processor.process_granule("test.foo", collection)
 
         mock_publisher.publish_message.assert_not_called()
         mock_history.push.assert_not_called()
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index b954812..14e7c3c 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -1,4 +1,3 @@
-import asyncio
 import os
 import tempfile
 import unittest
@@ -9,6 +8,7 @@ from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import CollectionConfigParsingError, CollectionConfigFileNotFoundError, \
     RelativePathCollectionError, ConflictingPathCollectionError
 from collection_manager.services import CollectionWatcher
+from common.async_test_utils.AsyncTestUtils import AsyncAssert, AsyncMock, async_test
 
 
 class TestCollectionWatcher(unittest.TestCase):
@@ -30,7 +30,7 @@ class TestCollectionWatcher(unittest.TestCase):
 
     def test_load_collections_loads_all_collections(self):
         collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml')
-        collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
+        collection_watcher = CollectionWatcher(collections_path, AsyncMock(), AsyncMock())
         collection_watcher._load_collections()
 
         self.assertEqual(len(collection_watcher._collections_by_dir), 2)
@@ -120,7 +120,8 @@ class TestCollectionWatcher(unittest.TestCase):
                                 date_to=None)
         self.assertRaises(ConflictingPathCollectionError, collection_watcher._validate_collection, collection)
 
-    def test_collection_callback_is_called(self):
+    @async_test
+    async def test_collection_callback_is_called(self):
         collections_config = tempfile.NamedTemporaryFile("w+b", buffering=0, delete=False)
         granule_dir = tempfile.TemporaryDirectory()
         collections_str = f"""collections:
@@ -131,14 +132,13 @@ class TestCollectionWatcher(unittest.TestCase):
   forward-processing-priority: 5"""
         collections_config.write(collections_str.encode("utf-8"))
 
-        collection_callback = Mock()
+        collection_callback = AsyncMock()
         collection_watcher = CollectionWatcher(collections_path=collections_config.name,
                                                collection_updated_callback=collection_callback,
-                                               granule_updated_callback=Mock(),
+                                               granule_updated_callback=AsyncMock(),
                                                collections_refresh_interval=0.1)
 
-        loop = asyncio.new_event_loop()
-        collection_watcher.start_watching(loop)
+        await collection_watcher.start_watching()
 
         collections_str = f"""
 - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
@@ -149,14 +149,14 @@ class TestCollectionWatcher(unittest.TestCase):
         """
         collections_config.write(collections_str.encode("utf-8"))
 
-        loop.run_until_complete(self.assert_called_within_timeout(collection_callback, call_count=2))
+        await AsyncAssert.assert_called_within_timeout(collection_callback, call_count=2)
 
-        loop.close()
         collections_config.close()
         granule_dir.cleanup()
         os.remove(collections_config.name)
 
-    def test_granule_callback_is_called_on_new_file(self):
+    @async_test
+    async def test_granule_callback_is_called_on_new_file(self):
         with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config:
             granule_dir = tempfile.TemporaryDirectory()
             collections_str = f"""
@@ -169,21 +169,18 @@ collections:
             """
             collections_config.write(collections_str.encode("utf-8"))
 
-            granule_callback = Mock()
-            collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
-
-            loop = asyncio.new_event_loop()
-            collection_watcher.start_watching(loop)
+            granule_callback = AsyncMock()
+            collection_watcher = CollectionWatcher(collections_config.name, AsyncMock(), granule_callback)
 
+            await collection_watcher.start_watching()
             new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
+            await AsyncAssert.assert_called_within_timeout(granule_callback)
 
-            loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
-
-            loop.close()
             new_granule.close()
             granule_dir.cleanup()
 
-    def test_granule_callback_is_called_on_modified_file(self):
+    @async_test
+    async def test_granule_callback_is_called_on_modified_file(self):
         with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config:
             granule_dir = tempfile.TemporaryDirectory()
             collections_str = f"""
@@ -197,33 +194,20 @@ collections:
             collections_config.write(collections_str.encode("utf-8"))
             new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
 
-            granule_callback = Mock()
-            collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback)
+            granule_callback = AsyncMock()
+            collection_watcher = CollectionWatcher(collections_config.name, AsyncMock(), granule_callback)
 
-            loop = asyncio.new_event_loop()
-            collection_watcher.start_watching(loop)
+            await collection_watcher.start_watching()
 
             new_granule.write("hello world")
             new_granule.close()
 
-            loop.run_until_complete(self.assert_called_within_timeout(granule_callback))
-            loop.close()
+            await AsyncAssert.assert_called_within_timeout(granule_callback)
             granule_dir.cleanup()
 
-    def test_run_periodically(self):
-        callback = Mock()
-        loop = asyncio.new_event_loop()
-        CollectionWatcher._run_periodically(loop, 0.1, callback)
-        loop.run_until_complete(self.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2))
-        loop.close()
-
-    @staticmethod
-    async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
-        start = datetime.now()
-
-        while (datetime.now() - start).total_seconds() < timeout_sec:
-            await asyncio.sleep(0.01)
-            if mock_func.call_count >= call_count:
-                return
-        raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec")
+    @async_test
+    async def test_run_periodically(self):
+        callback = AsyncMock()
+        await CollectionWatcher._run_periodically(None, 0.1, callback)
+        await AsyncAssert.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2)
 
diff --git a/common/common/__init__.py b/common/common/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/common/common/async_test_utils/AsyncTestUtils.py b/common/common/async_test_utils/AsyncTestUtils.py
new file mode 100644
index 0000000..ccb829b
--- /dev/null
+++ b/common/common/async_test_utils/AsyncTestUtils.py
@@ -0,0 +1,28 @@
+import asyncio
+from datetime import datetime
+from unittest import mock
+
+
+class AsyncMock(mock.MagicMock):
+    async def __call__(self, *args, **kwargs):
+        return super().__call__(*args, **kwargs)
+
+
+class AsyncAssert:
+    @staticmethod
+    async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
+        start = datetime.now()
+
+        while (datetime.now() - start).total_seconds() < timeout_sec:
+            await asyncio.sleep(0.01)
+            if mock_func.call_count >= call_count:
+                return
+        raise AssertionError(f"Mock did not reach {call_count} calls called within {timeout_sec} sec")
+
+
+def async_test(coro):
+    def wrapper(*args, **kwargs):
+        loop = asyncio.new_event_loop()
+        return loop.run_until_complete(coro(*args, **kwargs))
+
+    return wrapper
diff --git a/common/common/async_test_utils/__init__.py b/common/common/async_test_utils/__init__.py
new file mode 100644
index 0000000..12563af
--- /dev/null
+++ b/common/common/async_test_utils/__init__.py
@@ -0,0 +1 @@
+from .AsyncTestUtils import AsyncMock, AsyncAssert, async_test
diff --git a/common/setup.py b/common/setup.py
new file mode 100644
index 0000000..ed621c8
--- /dev/null
+++ b/common/setup.py
@@ -0,0 +1,21 @@
+import re
+
+import setuptools
+
+PACKAGE_NAME = "sdap_ingester_common"
+
+setuptools.setup(
+    name=PACKAGE_NAME,
+    author="Apache - SDAP",
+    author_email="dev@sdap.apache.org",
+    description="a module of common functions for the sdap ingester components",
+    url="https://github.com/apache/incubator-sdap-ingester",
+    packages=setuptools.find_packages(),
+    classifiers=[
+        "Programming Language :: Python :: 3",
+        "Operating System :: OS Independent",
+        "Development Status :: 4 - Beta",
+    ],
+    python_requires='>=3.7',
+    include_package_data=True
+)
diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore
deleted file mode 100644
index 5408b74..0000000
--- a/granule_ingester/.gitignore
+++ /dev/null
@@ -1,9 +0,0 @@
-.vscode
-.idea
-*.egg-info
-*__pycache__
-*.pytest_cache
-*.code-workspace
-.DS_STORE
-build
-dist
\ No newline at end of file
diff --git a/granule_ingester/tests/writers/test_SolrStore.py b/granule_ingester/tests/writers/test_SolrStore.py
index 76b85ac..0e971ce 100644
--- a/granule_ingester/tests/writers/test_SolrStore.py
+++ b/granule_ingester/tests/writers/test_SolrStore.py
@@ -1,4 +1,3 @@
-import asyncio
 import unittest
 
 from nexusproto import DataTile_pb2 as nexusproto
@@ -43,8 +42,8 @@ class TestSolrStore(unittest.TestCase):
         self.assertEqual('test_variable', solr_doc['tile_var_name_s'])
         self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon'])
         self.assertAlmostEqual(90.0, solr_doc['tile_max_lon'])
-        self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'])
-        self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'])
+        self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'], delta=1E-5)
+        self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'], delta=1E-5)
         self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt'])
         self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt'])
         self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d'])