You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/21 23:42:45 UTC
[incubator-sdap-ingester] 02/06: Move directory scanning out of
Collection class
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 3c39c301ad69f0654b3194de9a2d917277b8d123
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 2 13:33:46 2020 -0700
Move directory scanning out of Collection class
---
.../collection_manager/entities/Collection.py | 3 ---
collection_manager/collection_manager/main.py | 1 -
.../collection_manager/services/CollectionProcessor.py | 18 ++++++------------
.../collection_manager/services/CollectionWatcher.py | 8 +++++---
.../services/history_manager/IngestionHistory.py | 17 +++++++++--------
5 files changed, 20 insertions(+), 27 deletions(-)
diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 031a3a9..0feba0e 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -53,6 +53,3 @@ class Collection:
return os.path.dirname(file_path) == self.path
else:
return fnmatch(file_path, self.path)
-
- def files_owned(self) -> List[str]:
- return glob(self.path, recursive=True)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index cbe22f9..3de4fdd 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -70,7 +70,6 @@ async def main():
collection_processor = CollectionProcessor(message_publisher=publisher,
history_manager_builder=history_manager_builder)
collection_watcher = CollectionWatcher(collections_path=options.collections_path,
- collection_updated_callback=collection_processor.process_collection,
granule_updated_callback=collection_processor.process_granule,
collections_refresh_interval=int(options.refresh))
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index f08ade9..eb3bbae 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,12 +1,15 @@
import logging
import os.path
+from glob import glob
from typing import Dict
-import yaml
+import yaml
from collection_manager.entities import Collection
from collection_manager.services import MessagePublisher
-from collection_manager.services.history_manager import IngestionHistory, GranuleStatus
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
+from collection_manager.services.history_manager import (GranuleStatus,
+ IngestionHistory)
+from collection_manager.services.history_manager.IngestionHistory import \
+ IngestionHistoryBuilder
logger = logging.getLogger(__name__)
@@ -20,15 +23,6 @@ class CollectionProcessor:
self._history_manager_builder = history_manager_builder
self._history_manager_cache: Dict[str, IngestionHistory] = {}
- async def process_collection(self, collection: Collection):
- """
- Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
- :param collection: A Collection definition
- :return: None
- """
- for granule in collection.files_owned():
- await self.process_granule(granule, collection)
-
async def process_granule(self, granule: str, collection: Collection):
"""
Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1c7c1be..20ec7c7 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -3,6 +3,7 @@ import logging
import os
import time
from collections import defaultdict
+from glob import glob
from typing import Awaitable, Callable, Dict, Optional, Set
import yaml
@@ -21,14 +22,12 @@ logger.setLevel(logging.DEBUG)
class CollectionWatcher:
def __init__(self,
collections_path: str,
- collection_updated_callback: Callable[[Collection], Awaitable],
granule_updated_callback: Callable[[str, Collection], Awaitable],
collections_refresh_interval: float = 30):
if not os.path.isabs(collections_path):
raise RelativePathError("Collections config path must be an absolute path.")
self._collections_path = collections_path
- self._collection_updated_callback = collection_updated_callback
self._granule_updated_callback = granule_updated_callback
self._collections_refresh_interval = collections_refresh_interval
@@ -107,7 +106,10 @@ class CollectionWatcher:
logger.info(f"Scanning files for {len(updated_collections)} collections...")
start = time.perf_counter()
for collection in updated_collections:
- await self._collection_updated_callback(collection)
+ files_owned = glob(collection.path, recursive=True)
+ for granule in files_owned:
+ await self._granule_updated_callback(granule, collection)
+
logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
self._unschedule_watches()
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index b71c32f..231d179 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -48,10 +48,11 @@ class IngestionHistory(ABC):
signature = self._signature_fun(file_path)
await self._push_record(file_name, signature)
+ file_modified_date = os.path.getmtime(file_path)
if not self._latest_ingested_file_update:
- self._latest_ingested_file_update = os.path.getmtime(file_path)
+ self._latest_ingested_file_update = file_modified_date
else:
- self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
+ self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
await self._save_latest_timestamp()
@@ -73,9 +74,10 @@ class IngestionHistory(ABC):
should fall in order to be "desired".
:return: A GranuleStatus enum.
"""
- if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()):
+ file_modified_date = os.path.getmtime(file_path)
+ if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
return GranuleStatus.DESIRED_FORWARD_PROCESSING
- elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path):
+ elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
return GranuleStatus.DESIRED_HISTORICAL
else:
return GranuleStatus.UNDESIRED
@@ -114,15 +116,14 @@ class IngestionHistory(ABC):
pass
@staticmethod
- def _in_time_range(file, date_from: datetime = None, date_to: datetime = None):
+ def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None):
"""
:param file: file path as a string
:param date_from: timestamp, can be None
:param date_to: timestamp, can be None
:return: True is the update time of the file is between ts_from and ts_to. False otherwise
"""
- file_modified_time = os.path.getmtime(file)
- is_after_from = date_from.timestamp() < file_modified_time if date_from else True
- is_before_to = date_to.timestamp() > file_modified_time if date_to else True
+ is_after_from = start_date.timestamp() < date if start_date else True
+ is_before_to = end_date.timestamp() > date if end_date else True
return is_after_from and is_before_to