You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/11 00:31:08 UTC

[incubator-sdap-ingester] branch async-history updated (ed86c3d -> 3968bd7)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


 discard ed86c3d  wip
 discard 6aed6c4  use shared class
 discard 54a1335  async solr history
 discard dc00585  SDAP-277: Improved error handling in Granule Ingester (#15)
 discard 03badb4  SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)
 discard 6ebf49f  SDAP-266: add README note on synchronization of configmap with local path (#14)
 discard fec3299  SDAP-273: Configure max threads in Granule Ingester (#13)
    omit 13e7b9b  log all fs events
     add 68110d0  SDAP-273: Configure max threads in Granule Ingester (#13)
     add d94c89f  SDAP-266: add README note on synchronization of configmap with local path (#14)
     add fe6a1c5  SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)
     add 4d816f6  SDAP-277: Improved error handling in Granule Ingester (#15)
     new 774f928  log all fs events
     new 93618e9  async solr history
     new a5494ac  use shared class
     new 3968bd7  wip

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ed86c3d)
            \
             N -- N -- N   refs/heads/async-history (3968bd7)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[incubator-sdap-ingester] 01/04: log all fs events

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 774f928f00efd10ceee02dcbe101b34e8a68e61c
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 15:26:17 2020 -0700

    log all fs events
---
 collection_manager/collection_manager/services/CollectionWatcher.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 8911806..63dd30c 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -161,6 +161,11 @@ class _GranuleEventHandler(FileSystemEventHandler):
         self._callback = callback
         self._collections_for_dir = collections_for_dir
 
+    def on_any_event(self, event):
+        super().on_created(event)
+
+        logger.info(f"Collection Watcher received event: {event}")
+
     def on_created(self, event):
         super().on_created(event)
         for collection in self._collections_for_dir:


[incubator-sdap-ingester] 04/04: wip

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 3968bd7d19e28bbee5cb33e4c35582b42e11db3d
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Aug 10 17:29:32 2020 -0700

    wip
---
 .../services/CollectionWatcher.py                  |  2 --
 .../history_manager/FileIngestionHistory.py        |  6 +++---
 .../history_manager/test_FileIngestionHistory.py   | 24 ++++++++++++++--------
 .../tests/services/test_CollectionProcessor.py     |  2 +-
 .../granule_ingester/writers/SolrStore.py          | 18 +++++-----------
 5 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1fd1678..49acceb 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -111,8 +111,6 @@ class CollectionWatcher:
 
                 self._unschedule_watches()
                 self._schedule_watches()
-            else:
-                logger.info("No updated collections, so no files to scan")
         except CollectionConfigParsingError as e:
             logger.error(e)
 
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 140ae87..ffa065f 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -66,7 +66,7 @@ class FileIngestionHistory(IngestionHistory):
         except FileNotFoundError:
             logger.info(f"history cache {self._history_file_path} does not exist, does not need to be removed")
 
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         if self._latest_ingested_file_update:
             with open(self._latest_ingested_file_update_file_path, 'w') as f_ts:
                 f_ts.write(f'{str(self._latest_ingested_file_update)}\n')
@@ -90,10 +90,10 @@ class FileIngestionHistory(IngestionHistory):
         except FileNotFoundError:
             logger.info(f"no history file {self._history_file_path} to purge")
 
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         self._history_dict[file_name] = signature
 
         self._history_file.write(f'{file_name},{signature}\n')
 
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         return self._history_dict.get(file_name, None)
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index d2ad45c..ff1fb3c 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -3,25 +3,30 @@ import pathlib
 import tempfile
 import unittest
 
-from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath
+from collection_manager.services.history_manager import (FileIngestionHistory,
+                                                         md5sum_from_filepath)
+
+from common.async_test_utils.AsyncTestUtils import async_test
 
 DATASET_ID = "zobi_la_mouche"
 
 
 class TestFileIngestionHistory(unittest.TestCase):
 
-    def test_get_md5sum(self):
+    @async_test
+    async def test_get_md5sum(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
-            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-            result = ingestion_history._get_signature("blue")
+            await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = await ingestion_history._get_signature("blue")
             self.assertEqual(result, "12weeukrhbwerqu7wier")
 
-    def test_get_missing_md5sum(self):
+    @async_test
+    async def test_get_missing_md5sum(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
-            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-            result = ingestion_history._get_signature("green")
+            await ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = await ingestion_history._get_signature("green")
             self.assertIsNone(result)
 
     def test_already_ingested(self):
@@ -44,12 +49,13 @@ class TestFileIngestionHistory(unittest.TestCase):
 
             del ingestion_history
 
-    def test_already_ingested_is_false(self):
+    @async_test
+    async def test_already_ingested_is_false(self):
         with tempfile.TemporaryDirectory() as history_dir:
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+            self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
 
 
 if __name__ == '__main__':
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index aa143f3..fec4726 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -114,7 +114,7 @@ class TestCollectionProcessor(unittest.TestCase):
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', new_callable=AsyncMock)
     async def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher,
-                                                                             mock_history_builder, mock_history):
+                                                                                   mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING
         mock_history_builder.build.return_value = mock_history
 
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 65f8b09..67532b5 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,28 +21,20 @@ from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
-from common import AsyncUtils
 
 import pysolr
-from kazoo.handlers.threading import KazooTimeoutError
 from kazoo.exceptions import NoNodeError
-from nexusproto.DataTile_pb2 import TileSummary, NexusTile
+from kazoo.handlers.threading import KazooTimeoutError
 
-from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.exceptions import (SolrFailedHealthCheckError,
+                                         SolrLostConnectionError)
 from granule_ingester.writers.MetadataStore import MetadataStore
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
 
 logger = logging.getLogger(__name__)
 
 
-def run_in_executor(f):
-    @functools.wraps(f)
-    def inner(*args, **kwargs):
-        loop = asyncio.get_running_loop()
-        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
-
-    return inner
-
-
 class SolrStore(MetadataStore):
     def __init__(self, solr_url=None, zk_url=None):
         super().__init__()


[incubator-sdap-ingester] 02/04: async solr history

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 93618e9a2e08d9dbf618469044b6a9c4d2d14efa
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:15:11 2020 -0700

    async solr history
---
 .../services/CollectionProcessor.py                |  8 +++----
 .../services/CollectionWatcher.py                  |  1 -
 .../history_manager/FileIngestionHistory.py        |  3 ++-
 .../services/history_manager/IngestionHistory.py   | 26 +++++++++++-----------
 .../history_manager/SolrIngestionHistory.py        | 11 +++++----
 collection_manager/docker/Dockerfile               | 12 +++++-----
 config_operator/config_operator/main.py            |  1 +
 .../granule_ingester/writers/SolrStore.py          |  1 +
 8 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index d790f4b..fc91e01 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -45,7 +45,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -59,13 +59,13 @@ class CollectionProcessor:
                         f"'{collection.dataset_id}'.")
             use_priority = collection.historical_priority
         else:
-            logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired "
-                        f"time range for collection '{collection.dataset_id}'. Skipping.")
+            logger.debug(f"Granule '{granule}' detected but has already been ingested or is not in desired "
+                         f"time range for collection '{collection.dataset_id}'. Skipping.")
             return
 
         dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        history_manager.push(granule)
+        await history_manager.push(granule)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 63dd30c..0ae2b49 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -6,7 +6,6 @@ from typing import Dict, Callable, Set, Optional, Awaitable
 import yaml
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
-from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index 50f2170..140ae87 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -28,7 +28,8 @@ class FileIngestionHistory(IngestionHistory):
         Constructor
         :param history_path:
         :param dataset_id:
-        :param signature_fun: function which create the signature of the cache, a file path string as argument and returns a string (md5sum, time stamp)
+        :param signature_fun: function which creates the signature of the cache,
+                              a file path string as argument and returns a string (md5sum, time stamp)
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d92cb24..ef73ccb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -37,7 +37,7 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    def push(self, file_path: str):
+    async def push(self, file_path: str):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
@@ -46,14 +46,14 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        self._push_record(file_name, signature)
+        await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
             self._latest_ingested_file_update = os.path.getmtime(file_path)
         else:
             self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
 
-        self._save_latest_timestamp()
+        await self._save_latest_timestamp()
 
     def latest_ingested_mtime(self) -> Optional[datetime]:
         """
@@ -65,7 +65,7 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    def already_ingested(self, file_path: str) -> bool:
+    async def already_ingested(self, file_path: str) -> bool:
         """
         Return a boolean indicating whether the specified file has already been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
@@ -74,12 +74,12 @@ class IngestionHistory(ABC):
         file_path = file_path.strip()
         file_name = os.path.basename(file_path)
         signature = self._signature_fun(file_path)
-        return signature == self._get_signature(file_name)
+        return signature == await self._get_signature(file_name)
 
-    def get_granule_status(self,
-                           file_path: str,
-                           date_from: datetime = None,
-                           date_to: datetime = None) -> GranuleStatus:
+    async def get_granule_status(self,
+                                 file_path: str,
+                                 date_from: datetime = None,
+                                 date_to: datetime = None) -> GranuleStatus:
         """
         Get the history status of a granule. DESIRED_FORWARD_PROCESSING means the granule has not yet been ingested
         and and is newer than the newest granule that was ingested (see IngestionHistory.latest_ingested_mtime).
@@ -96,21 +96,21 @@ class IngestionHistory(ABC):
         """
         if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
     @abstractmethod
-    def _save_latest_timestamp(self):
+    async def _save_latest_timestamp(self):
         pass
 
     @abstractmethod
-    def _push_record(self, file_name, signature):
+    async def _push_record(self, file_name, signature):
         pass
 
     @abstractmethod
-    def _get_signature(self, file_name):
+    async def _get_signature(self, file_name):
         pass
 
     @staticmethod
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 79d6eef..59f5cd7 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -4,10 +4,12 @@ import logging
 import pysolr
 import requests
 
+from common.async_utils.AsyncUtils import run_in_executor
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
 from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
+logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
 
 
@@ -46,6 +48,7 @@ class SolrIngestionHistory(IngestionHistory):
     def __del__(self):
         self._req_session.close()
 
+    @run_in_executor
     def _push_record(self, file_name, signature):
         hash_id = doc_key(self._dataset_id, file_name)
         self._solr_granules.delete(q=f"id:{hash_id}")
@@ -57,6 +60,7 @@ class SolrIngestionHistory(IngestionHistory):
         self._solr_granules.commit()
         return None
 
+    @run_in_executor
     def _save_latest_timestamp(self):
         if self._solr_datasets:
             self._solr_datasets.delete(q=f"id:{self._dataset_id}")
@@ -73,6 +77,7 @@ class SolrIngestionHistory(IngestionHistory):
         else:
             return None
 
+    @run_in_executor
     def _get_signature(self, file_name):
         hash_id = doc_key(self._dataset_id, file_name)
         results = self._solr_granules.search(q=f"id:{hash_id}")
@@ -110,9 +115,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "granule_s", "string")
                 self._add_field(schema_endpoint, "granule_signature_s", "string")
 
-            else:
-                logger.info(f"collection {self._granule_collection_name} already exists")
-
             if self._dataset_collection_name not in existing_collections:
                 # Create collection
                 payload = {'action': 'CREATE',
@@ -128,9 +130,6 @@ class SolrIngestionHistory(IngestionHistory):
                 self._add_field(schema_endpoint, "dataset_s", "string")
                 self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
 
-            else:
-                logger.info(f"collection {self._dataset_collection_name} already exists")
-
         except requests.exceptions.RequestException as e:
             logger.error(f"solr instance unreachable {self._solr_url}")
             raise e
diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index ce1b577..2a57784 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -5,12 +5,14 @@ RUN curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add
 RUN echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list
 RUN apt-get update && apt-get install -y kubectl
 
-COPY /collection_manager /collection_manager/collection_manager
-COPY /setup.py /collection_manager/setup.py
-COPY /requirements.txt /collection_manager/requirements.txt
-COPY /README.md /collection_manager/README.md
-COPY /docker/entrypoint.sh /entrypoint.sh
+COPY common /common
+COPY collection_manager/collection_manager /collection_manager/collection_manager
+COPY collection_manager/setup.py /collection_manager/setup.py
+COPY collection_manager/requirements.txt /collection_manager/requirements.txt
+COPY collection_manager/README.md /collection_manager/README.md
+COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
+RUN cd /common && python setup.py install
 RUN cd /collection_manager && python setup.py install
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/config_operator/config_operator/main.py b/config_operator/config_operator/main.py
index fbbbe6b..1df9cf6 100644
--- a/config_operator/config_operator/main.py
+++ b/config_operator/config_operator/main.py
@@ -4,6 +4,7 @@ import kopf
 from config_operator.config_source import RemoteGitConfig, LocalDirConfig
 from config_operator.k8s import K8sConfigMap
 
+
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index e098672..65f8b09 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,6 +21,7 @@ from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
+from common import AsyncUtils
 
 import pysolr
 from kazoo.handlers.threading import KazooTimeoutError


[incubator-sdap-ingester] 03/04: use shared class

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch async-history
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a5494acfa10751fffc26c7a0e1535d1c596e6e1b
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Aug 6 16:55:39 2020 -0700

    use shared class
---
 .../collection_manager/services/CollectionWatcher.py          | 11 +++++++++--
 common/common/async_utils/AsyncUtils.py                       | 11 +++++++++++
 common/common/async_utils/__init__.py                         |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 0ae2b49..1fd1678 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+import time
 import logging
 import os
 from collections import defaultdict
@@ -101,11 +102,17 @@ class CollectionWatcher:
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
-            for collection in updated_collections:
-                await self._collection_updated_callback(collection)
             if len(updated_collections) > 0:
+                logger.info(f"Scanning files for {len(updated_collections)} collections...")
+                start = time.perf_counter()
+                for collection in updated_collections:
+                    await self._collection_updated_callback(collection)
+                logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+
                 self._unschedule_watches()
                 self._schedule_watches()
+            else:
+                logger.info("No updated collections, so no files to scan")
         except CollectionConfigParsingError as e:
             logger.error(e)
 
diff --git a/common/common/async_utils/AsyncUtils.py b/common/common/async_utils/AsyncUtils.py
new file mode 100644
index 0000000..5fefd45
--- /dev/null
+++ b/common/common/async_utils/AsyncUtils.py
@@ -0,0 +1,11 @@
+import asyncio
+import functools
+
+
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
diff --git a/common/common/async_utils/__init__.py b/common/common/async_utils/__init__.py
new file mode 100644
index 0000000..9a468e0
--- /dev/null
+++ b/common/common/async_utils/__init__.py
@@ -0,0 +1 @@
+from .AsyncUtils import run_in_executor