You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/28 22:09:16 UTC

[incubator-sdap-ingester] branch rabbitmq-fix updated: use asyncio in the collection ingester

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/rabbitmq-fix by this push:
     new 1a66bc1  use asyncio in the collection ingester
1a66bc1 is described below

commit 1a66bc11d0fe08cff2b5943ff581c0c414333aca
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 28 15:09:03 2020 -0700

    use asyncio in the collection ingester
---
 collection_manager/collection_manager/main.py      | 34 +++++++++---------
 .../services/CollectionProcessor.py                |  8 ++---
 .../services/CollectionWatcher.py                  | 34 ++++++++++--------
 .../services/MessagePublisher.py                   | 41 +++++++++++-----------
 collection_manager/requirements.txt                |  2 ++
 5 files changed, 63 insertions(+), 56 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 43b687e..cbe22f9 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -63,25 +63,23 @@ async def main():
             history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
         else:
             history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
-        publisher = MessagePublisher(host=options.rabbitmq_host,
-                                     username=options.rabbitmq_username,
-                                     password=options.rabbitmq_password,
-                                     queue=options.rabbitmq_queue)
-        publisher.connect()
-        collection_processor = CollectionProcessor(message_publisher=publisher,
-                                                   history_manager_builder=history_manager_builder)
-        collection_watcher = CollectionWatcher(collections_path=options.collections_path,
-                                               collection_updated_callback=collection_processor.process_collection,
-                                               granule_updated_callback=collection_processor.process_granule,
-                                               collections_refresh_interval=int(options.refresh))
+        async with MessagePublisher(host=options.rabbitmq_host,
+                                    username=options.rabbitmq_username,
+                                    password=options.rabbitmq_password,
+                                    queue=options.rabbitmq_queue) as publisher:
+            collection_processor = CollectionProcessor(message_publisher=publisher,
+                                                       history_manager_builder=history_manager_builder)
+            collection_watcher = CollectionWatcher(collections_path=options.collections_path,
+                                                   collection_updated_callback=collection_processor.process_collection,
+                                                   granule_updated_callback=collection_processor.process_granule,
+                                                   collections_refresh_interval=int(options.refresh))
 
-        collection_watcher.start_watching()
-
-        while True:
-            try:
-                await asyncio.sleep(1)
-            except KeyboardInterrupt:
-                return
+            await collection_watcher.start_watching()
+            while True:
+                try:
+                    await asyncio.sleep(1)
+                except KeyboardInterrupt:
+                    return
 
     except Exception as e:
         logger.exception(e)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 232cdee..d790f4b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -25,16 +25,16 @@ class CollectionProcessor:
         with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
             self._config_template = config_template_file.read()
 
-    def process_collection(self, collection: Collection):
+    async def process_collection(self, collection: Collection):
         """
         Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
         :param collection: A Collection definition
         :return: None
         """
         for granule in collection.files_owned():
-            self.process_granule(granule, collection)
+            await self.process_granule(granule, collection)
 
-    def process_granule(self, granule: str, collection: Collection):
+    async def process_granule(self, granule: str, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -64,7 +64,7 @@ class CollectionProcessor:
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
-        self._publisher.publish_message(body=dataset_config, priority=use_priority)
+        await self._publisher.publish_message(body=dataset_config, priority=use_priority)
         history_manager.push(granule)
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 2387016..0d7da84 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,7 +2,6 @@ import asyncio
 import logging
 import os
 from collections import defaultdict
-from functools import partial
 from typing import Dict, Callable, Set, Optional
 
 import yaml
@@ -38,7 +37,7 @@ class CollectionWatcher:
 
         self._granule_watches = set()
 
-    def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
+    async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
         """
         Periodically load the Collections Configuration file to check for changes,
         and observe filesystem events for added/modified granules. When an event occurs,
@@ -46,7 +45,7 @@ class CollectionWatcher:
         :return: None
         """
 
-        self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+        await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
         self._observer.start()
 
     def collections(self) -> Set[Collection]:
@@ -99,11 +98,11 @@ class CollectionWatcher:
         self._load_collections()
         return self.collections() - old_collections
 
-    def _reload_and_reschedule(self):
+    async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             for collection in updated_collections:
-                self._collection_updated_callback(collection)
+                await self._collection_updated_callback(collection)
             if len(updated_collections) > 0:
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -117,7 +116,9 @@ class CollectionWatcher:
 
     def _schedule_watches(self):
         for directory, collections in self._collections_by_dir.items():
-            granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+            granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(),
+                                                         self._granule_updated_callback,
+                                                         collections)
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
@@ -127,18 +128,22 @@ class CollectionWatcher:
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
 
     @classmethod
-    def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                coro,
+                                *args):
         """
         Call a function periodically. This uses asyncio, and is non-blocking.
         :param loop: An optional event loop to use. If None, the current running event loop will be used.
         :param wait_time: seconds to wait between iterations of func
-        :param func: the function that will be run
+        :param coro: the coroutine that will be awaited
         :param args: any args that need to be provided to func
         """
         if loop is None:
             loop = asyncio.get_running_loop()
-        func(*args)
-        loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
+        await coro(*args)
+        loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro))
 
 
 class _GranuleEventHandler(FileSystemEventHandler):
@@ -146,15 +151,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
     EventHandler that watches for new or modified granule files.
     """
 
-    def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
-        self._callback = callback
+    def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]):
+        self._loop = loop
+        self._callback_coro = callback_coro
         self._collections_for_dir = collections_for_dir
 
     def on_created(self, event):
         super().on_created(event)
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-                self._callback(event.src_path, collection)
+                self._loop.create_task(self._callback_coro(event.src_path, collection))
 
     def on_modified(self, event):
         super().on_modified(event)
@@ -163,4 +169,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
-                self._callback(event.src_path, collection)
+                self._loop.create_task(self._callback_coro(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index f7a5517..75803d1 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -1,4 +1,5 @@
-import pika
+from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust
+from tenacity import retry, stop_after_attempt, wait_fixed
 
 
 class MessagePublisher:
@@ -6,34 +7,34 @@ class MessagePublisher:
     def __init__(self, host: str, username: str, password: str, queue: str):
         self._connection_string = f"amqp://{username}:{password}@{host}/"
         self._queue = queue
-        self._channel = None
-        self._connection = None
+        self._channel: Channel = None
+        self._connection: Connection = None
 
-    def connect(self):
+    async def __aenter__(self):
+        await self._connect()
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        if self._connection:
+            await self._connection.close()
+
+    async def _connect(self):
         """
         Establish a connection to RabbitMQ.
         :return: None
         """
-        parameters = pika.URLParameters(self._connection_string)
-        self._connection = pika.BlockingConnection(parameters)
-        self._channel = self._connection.channel()
-        self._channel.queue_declare(self._queue, durable=True)
+        self._connection = await connect_robust(self._connection_string)
+        self._channel = await self._connection.channel()
+        await self._channel.declare_queue(self._queue, durable=True)
 
-    def publish_message(self, body: str, priority: int = None):
+    @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
+    async def publish_message(self, body: str, priority: int = None):
         """
         Publish a message to RabbitMQ using the optional message priority.
         :param body: A string to publish to RabbitMQ
         :param priority: An optional integer priority to use for the message
         :return: None
         """
-        properties = pika.BasicProperties(content_type='text/plain',
-                                          delivery_mode=2,
-                                          priority=priority)
-        self._channel.basic_publish(exchange='',
-                                    routing_key=self._queue,
-                                    body=body,
-                                    properties=properties)
-
-    def __del__(self):
-        if self._connection:
-            self._connection.close()
+        message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT)
+        await self._channel.default_exchange.publish(message, routing_key=self._queue)
+
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..47ae867 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,3 +4,5 @@ pysolr==3.8.1
 pika==1.1.0
 watchdog==0.10.2
 requests==2.23.0
+aio-pika==6.6.1
+tenacity==6.2.0
\ No newline at end of file