You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/11 00:31:11 UTC

[incubator-sdap-ingester] 03/04: use shared class

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a5494acfa10751fffc26c7a0e1535d1c596e6e1b
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:55:39 2020 -0700

    use shared class
---
 .../collection_manager/services/CollectionWatcher.py          | 11 +++++++++--
 common/common/async_utils/AsyncUtils.py                       | 11 +++++++++++
 common/common/async_utils/__init__.py                         |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 0ae2b49..1fd1678 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+import time
 import logging
 import os
 from collections import defaultdict
@@ -101,11 +102,17 @@ class CollectionWatcher:
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
-            for collection in updated_collections:
-                await self._collection_updated_callback(collection)
             if len(updated_collections) > 0:
+                logger.info(f"Scanning files for {len(updated_collections)} collections...")
+                start = time.perf_counter()
+                for collection in updated_collections:
+                    await self._collection_updated_callback(collection)
+                logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+
                 self._unschedule_watches()
                 self._schedule_watches()
+            else:
+                logger.info("No updated collections, so no files to scan")
         except CollectionConfigParsingError as e:
             logger.error(e)
 
diff --git a/common/common/async_utils/AsyncUtils.py b/common/common/async_utils/AsyncUtils.py
new file mode 100644
index 0000000..5fefd45
--- /dev/null
+++ b/common/common/async_utils/AsyncUtils.py
@@ -0,0 +1,11 @@
+import asyncio
+import functools
+
+
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
diff --git a/common/common/async_utils/__init__.py b/common/common/async_utils/__init__.py
new file mode 100644
index 0000000..9a468e0
--- /dev/null
+++ b/common/common/async_utils/__init__.py
@@ -0,0 +1 @@
+from .AsyncUtils import run_in_executor