You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/28 22:09:16 UTC
[incubator-sdap-ingester] branch rabbitmq-fix updated: use asyncio
in the collection ingester
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/rabbitmq-fix by this push:
new 1a66bc1 use asyncio in the collection ingester
1a66bc1 is described below
commit 1a66bc11d0fe08cff2b5943ff581c0c414333aca
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 28 15:09:03 2020 -0700
use asyncio in the collection ingester
---
collection_manager/collection_manager/main.py | 34 +++++++++---------
.../services/CollectionProcessor.py | 8 ++---
.../services/CollectionWatcher.py | 34 ++++++++++--------
.../services/MessagePublisher.py | 41 +++++++++++-----------
collection_manager/requirements.txt | 2 ++
5 files changed, 63 insertions(+), 56 deletions(-)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 43b687e..cbe22f9 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -63,25 +63,23 @@ async def main():
history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
else:
history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
- publisher = MessagePublisher(host=options.rabbitmq_host,
- username=options.rabbitmq_username,
- password=options.rabbitmq_password,
- queue=options.rabbitmq_queue)
- publisher.connect()
- collection_processor = CollectionProcessor(message_publisher=publisher,
- history_manager_builder=history_manager_builder)
- collection_watcher = CollectionWatcher(collections_path=options.collections_path,
- collection_updated_callback=collection_processor.process_collection,
- granule_updated_callback=collection_processor.process_granule,
- collections_refresh_interval=int(options.refresh))
+ async with MessagePublisher(host=options.rabbitmq_host,
+ username=options.rabbitmq_username,
+ password=options.rabbitmq_password,
+ queue=options.rabbitmq_queue) as publisher:
+ collection_processor = CollectionProcessor(message_publisher=publisher,
+ history_manager_builder=history_manager_builder)
+ collection_watcher = CollectionWatcher(collections_path=options.collections_path,
+ collection_updated_callback=collection_processor.process_collection,
+ granule_updated_callback=collection_processor.process_granule,
+ collections_refresh_interval=int(options.refresh))
- collection_watcher.start_watching()
-
- while True:
- try:
- await asyncio.sleep(1)
- except KeyboardInterrupt:
- return
+ await collection_watcher.start_watching()
+ while True:
+ try:
+ await asyncio.sleep(1)
+ except KeyboardInterrupt:
+ return
except Exception as e:
logger.exception(e)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 232cdee..d790f4b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -25,16 +25,16 @@ class CollectionProcessor:
with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
self._config_template = config_template_file.read()
- def process_collection(self, collection: Collection):
+ async def process_collection(self, collection: Collection):
"""
Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
:param collection: A Collection definition
:return: None
"""
for granule in collection.files_owned():
- self.process_granule(granule, collection)
+ await self.process_granule(granule, collection)
- def process_granule(self, granule: str, collection: Collection):
+ async def process_granule(self, granule: str, collection: Collection):
"""
Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
:param granule: A path to a granule file
@@ -64,7 +64,7 @@ class CollectionProcessor:
return
dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
- self._publisher.publish_message(body=dataset_config, priority=use_priority)
+ await self._publisher.publish_message(body=dataset_config, priority=use_priority)
history_manager.push(granule)
@staticmethod
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 2387016..0d7da84 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -2,7 +2,6 @@ import asyncio
import logging
import os
from collections import defaultdict
-from functools import partial
from typing import Dict, Callable, Set, Optional
import yaml
@@ -38,7 +37,7 @@ class CollectionWatcher:
self._granule_watches = set()
- def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
+ async def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None):
"""
Periodically load the Collections Configuration file to check for changes,
and observe filesystem events for added/modified granules. When an event occurs,
@@ -46,7 +45,7 @@ class CollectionWatcher:
:return: None
"""
- self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
+ await self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule)
self._observer.start()
def collections(self) -> Set[Collection]:
@@ -99,11 +98,11 @@ class CollectionWatcher:
self._load_collections()
return self.collections() - old_collections
- def _reload_and_reschedule(self):
+ async def _reload_and_reschedule(self):
try:
updated_collections = self._get_updated_collections()
for collection in updated_collections:
- self._collection_updated_callback(collection)
+ await self._collection_updated_callback(collection)
if len(updated_collections) > 0:
self._unschedule_watches()
self._schedule_watches()
@@ -117,7 +116,9 @@ class CollectionWatcher:
def _schedule_watches(self):
for directory, collections in self._collections_by_dir.items():
- granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
+ granule_event_handler = _GranuleEventHandler(asyncio.get_running_loop(),
+ self._granule_updated_callback,
+ collections)
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
try:
@@ -127,18 +128,22 @@ class CollectionWatcher:
logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
@classmethod
- def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args):
+ async def _run_periodically(cls,
+ loop: Optional[asyncio.AbstractEventLoop],
+ wait_time: float,
+ coro,
+ *args):
"""
Call a function periodically. This uses asyncio, and is non-blocking.
:param loop: An optional event loop to use. If None, the current running event loop will be used.
:param wait_time: seconds to wait between iterations of func
- :param func: the function that will be run
+ :param coro: the coroutine that will be awaited
:param args: any args that need to be provided to func
"""
if loop is None:
loop = asyncio.get_running_loop()
- func(*args)
- loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func))
+ await coro(*args)
+ loop.call_later(wait_time, asyncio.create_task, cls._run_periodically(loop, wait_time, coro))
class _GranuleEventHandler(FileSystemEventHandler):
@@ -146,15 +151,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
EventHandler that watches for new or modified granule files.
"""
- def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]):
- self._callback = callback
+ def __init__(self, loop: asyncio.AbstractEventLoop, callback_coro, collections_for_dir: Set[Collection]):
+ self._loop = loop
+ self._callback_coro = callback_coro
self._collections_for_dir = collections_for_dir
def on_created(self, event):
super().on_created(event)
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
- self._callback(event.src_path, collection)
+ self._loop.create_task(self._callback_coro(event.src_path, collection))
def on_modified(self, event):
super().on_modified(event)
@@ -163,4 +169,4 @@ class _GranuleEventHandler(FileSystemEventHandler):
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
- self._callback(event.src_path, collection)
+ self._loop.create_task(self._callback_coro(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index f7a5517..75803d1 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -1,4 +1,5 @@
-import pika
+from aio_pika import Message, DeliveryMode, Connection, Channel, connect_robust
+from tenacity import retry, stop_after_attempt, wait_fixed
class MessagePublisher:
@@ -6,34 +7,34 @@ class MessagePublisher:
def __init__(self, host: str, username: str, password: str, queue: str):
self._connection_string = f"amqp://{username}:{password}@{host}/"
self._queue = queue
- self._channel = None
- self._connection = None
+ self._channel: Channel = None
+ self._connection: Connection = None
- def connect(self):
+ async def __aenter__(self):
+ await self._connect()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self._connection:
+ await self._connection.close()
+
+ async def _connect(self):
"""
Establish a connection to RabbitMQ.
:return: None
"""
- parameters = pika.URLParameters(self._connection_string)
- self._connection = pika.BlockingConnection(parameters)
- self._channel = self._connection.channel()
- self._channel.queue_declare(self._queue, durable=True)
+ self._connection = await connect_robust(self._connection_string)
+ self._channel = await self._connection.channel()
+ await self._channel.declare_queue(self._queue, durable=True)
- def publish_message(self, body: str, priority: int = None):
+ @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
+ async def publish_message(self, body: str, priority: int = None):
"""
Publish a message to RabbitMQ using the optional message priority.
:param body: A string to publish to RabbitMQ
:param priority: An optional integer priority to use for the message
:return: None
"""
- properties = pika.BasicProperties(content_type='text/plain',
- delivery_mode=2,
- priority=priority)
- self._channel.basic_publish(exchange='',
- routing_key=self._queue,
- body=body,
- properties=properties)
-
- def __del__(self):
- if self._connection:
- self._connection.close()
+ message = Message(body=body.encode('utf-8'), priority=priority, delivery_mode=DeliveryMode.PERSISTENT)
+ await self._channel.default_exchange.publish(message, routing_key=self._queue)
+
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index f16bde3..47ae867 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,3 +4,5 @@ pysolr==3.8.1
pika==1.1.0
watchdog==0.10.2
requests==2.23.0
+aio-pika==6.6.1
+tenacity==6.2.0
\ No newline at end of file