You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/11/04 01:11:38 UTC

[incubator-sdap-ingester] 09/15: fixed scanning weirdness

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a7e3488093552aacced4a97d67ed717c458cb126
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 16:32:57 2020 -0700

    fixed scanning weirdness
---
 .../services/CollectionWatcher.py                  | 32 ++++++++++++----------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index f885b1c..1fb6abd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,16 +1,15 @@
 import asyncio
 from datetime import datetime
-from collection_manager.entities.Collection import CollectionStorageType
+from collection_manager.entities.Collection import CollectionStorageType, Collection
 from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
 from collections import defaultdict
 from glob import glob
-from typing import Awaitable, Callable, Dict, Optional, Set
+from typing import Awaitable, Callable, Dict, List, Optional, Set
 
 import yaml
-from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
                                                     CollectionConfigParsingError,
                                                     ConflictingPathCollectionError,
@@ -58,7 +57,7 @@ class CollectionWatcher:
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
 
-        if type(self._observer) == S3Observer:
+        if isinstance(self._observer, S3Observer):
             await self._observer.start()
         else:
             self._observer.start()
@@ -117,18 +116,23 @@ class CollectionWatcher:
         self._load_collections()
         return self._collections() - old_collections
 
+    async def _call_callback_for_all_granules(self, collections: List[Collection]):
+        logger.info(f"Scanning files for {len(collections)} collections...")
+        start = time.perf_counter()
+        for collection in collections:
+            for granule_path in glob(collection.path, recursive=True):
+                modified_time = os.path.getmtime(granule_path)
+                await self._granule_updated_callback(granule_path, modified_time, collection)
+        logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             if len(updated_collections) > 0:
-                logger.info(f"Scanning files for {len(updated_collections)} collections...")
-                start = time.perf_counter()
-                for collection in updated_collections:
-                    files_owned = glob(collection.path, recursive=True)
-                    for granule in files_owned:
-                        await self._granule_updated_callback(granule, collection)
-
-                logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+                # For S3 collections, the S3Observer will report as new any files that haven't already been scanned.
+                # So we only need to rescan granules here if not using S3.
+                if not isinstance(self._observer, S3Observer):
+                    await self._call_callback_for_all_granules(collections=updated_collections)
 
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -148,7 +152,7 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                if type(self._observer) == S3Observer:
+                if isinstance(self._observer, S3Observer):
                     self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
                 else:
                     self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
@@ -205,7 +209,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
                     if isinstance(event, S3Event):
                         modified_time = event.modified_time
                     else:
-                        modified_time = os.path.getmtime(path)
+                        modified_time = datetime.fromtimestamp(os.path.getmtime(path))
                     self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return