You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/11/04 01:11:38 UTC
[incubator-sdap-ingester] 09/15: fixed scanning weirdness
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a7e3488093552aacced4a97d67ed717c458cb126
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 16:32:57 2020 -0700
fixed scanning weirdness
---
.../services/CollectionWatcher.py | 32 ++++++++++++----------
1 file changed, 18 insertions(+), 14 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index f885b1c..1fb6abd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,16 +1,15 @@
import asyncio
from datetime import datetime
-from collection_manager.entities.Collection import CollectionStorageType
+from collection_manager.entities.Collection import CollectionStorageType, Collection
from collection_manager.services.S3Observer import S3Event, S3Observer
import logging
import os
import time
from collections import defaultdict
from glob import glob
-from typing import Awaitable, Callable, Dict, Optional, Set
+from typing import Awaitable, Callable, Dict, List, Optional, Set
import yaml
-from collection_manager.entities import Collection
from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
CollectionConfigParsingError,
ConflictingPathCollectionError,
@@ -58,7 +57,7 @@ class CollectionWatcher:
wait_time=self._collections_refresh_interval,
func=self._reload_and_reschedule)
- if type(self._observer) == S3Observer:
+ if isinstance(self._observer, S3Observer):
await self._observer.start()
else:
self._observer.start()
@@ -117,18 +116,23 @@ class CollectionWatcher:
self._load_collections()
return self._collections() - old_collections
+ async def _call_callback_for_all_granules(self, collections: List[Collection]):
+ logger.info(f"Scanning files for {len(collections)} collections...")
+ start = time.perf_counter()
+ for collection in collections:
+ for granule_path in glob(collection.path, recursive=True):
+ modified_time = os.path.getmtime(granule_path)
+ await self._granule_updated_callback(granule_path, modified_time, collection)
+ logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+
async def _reload_and_reschedule(self):
try:
updated_collections = self._get_updated_collections()
if len(updated_collections) > 0:
- logger.info(f"Scanning files for {len(updated_collections)} collections...")
- start = time.perf_counter()
- for collection in updated_collections:
- files_owned = glob(collection.path, recursive=True)
- for granule in files_owned:
- await self._granule_updated_callback(granule, collection)
-
- logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+ # For S3 collections, the S3Observer will report as new any files that haven't already been scanned.
+ # So we only need to rescan granules here if not using S3.
+ if not isinstance(self._observer, S3Observer):
+ await self._call_callback_for_all_granules(collections=updated_collections)
self._unschedule_watches()
self._schedule_watches()
@@ -148,7 +152,7 @@ class CollectionWatcher:
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
try:
- if type(self._observer) == S3Observer:
+ if isinstance(self._observer, S3Observer):
self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
else:
self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
@@ -205,7 +209,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
if isinstance(event, S3Event):
modified_time = event.modified_time
else:
- modified_time = os.path.getmtime(path)
+ modified_time = datetime.fromtimestamp(os.path.getmtime(path))
self._loop.create_task(self._callback(path, modified_time, collection))
except IsADirectoryError:
return