You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/21 23:42:45 UTC

[incubator-sdap-ingester] 02/06: Move directory scanning out of Collection class

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 3c39c301ad69f0654b3194de9a2d917277b8d123
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 2 13:33:46 2020 -0700

    Move directory scanning out of Collection class
---
 .../collection_manager/entities/Collection.py          |  3 ---
 collection_manager/collection_manager/main.py          |  1 -
 .../collection_manager/services/CollectionProcessor.py | 18 ++++++------------
 .../collection_manager/services/CollectionWatcher.py   |  8 +++++---
 .../services/history_manager/IngestionHistory.py       | 17 +++++++++--------
 5 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 031a3a9..0feba0e 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -53,6 +53,3 @@ class Collection:
             return os.path.dirname(file_path) == self.path
         else:
             return fnmatch(file_path, self.path)
-
-    def files_owned(self) -> List[str]:
-        return glob(self.path, recursive=True)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index cbe22f9..3de4fdd 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -70,7 +70,6 @@ async def main():
             collection_processor = CollectionProcessor(message_publisher=publisher,
                                                        history_manager_builder=history_manager_builder)
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
-                                                   collection_updated_callback=collection_processor.process_collection,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh))
 
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index f08ade9..eb3bbae 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,12 +1,15 @@
 import logging
 import os.path
+from glob import glob
 from typing import Dict
-import yaml
 
+import yaml
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
-from collection_manager.services.history_manager import IngestionHistory, GranuleStatus
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
+from collection_manager.services.history_manager import (GranuleStatus,
+                                                         IngestionHistory)
+from collection_manager.services.history_manager.IngestionHistory import \
+    IngestionHistoryBuilder
 
 logger = logging.getLogger(__name__)
 
@@ -20,15 +23,6 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_collection(self, collection: Collection):
-        """
-        Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
-        :param collection: A Collection definition
-        :return: None
-        """
-        for granule in collection.files_owned():
-            await self.process_granule(granule, collection)
-
     async def process_granule(self, granule: str, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1c7c1be..20ec7c7 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -3,6 +3,7 @@ import logging
 import os
 import time
 from collections import defaultdict
+from glob import glob
 from typing import Awaitable, Callable, Dict, Optional, Set
 
 import yaml
@@ -21,14 +22,12 @@ logger.setLevel(logging.DEBUG)
 class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
-                 collection_updated_callback: Callable[[Collection], Awaitable],
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
 
         self._collections_path = collections_path
-        self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
         self._collections_refresh_interval = collections_refresh_interval
 
@@ -107,7 +106,10 @@ class CollectionWatcher:
                 logger.info(f"Scanning files for {len(updated_collections)} collections...")
                 start = time.perf_counter()
                 for collection in updated_collections:
-                    await self._collection_updated_callback(collection)
+                    files_owned = glob(collection.path, recursive=True)
+                    for granule in files_owned:
+                        await self._granule_updated_callback(granule, collection)
+
                 logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
 
                 self._unschedule_watches()
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index b71c32f..231d179 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -48,10 +48,11 @@ class IngestionHistory(ABC):
         signature = self._signature_fun(file_path)
         await self._push_record(file_name, signature)
 
+        file_modified_date = os.path.getmtime(file_path)
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = os.path.getmtime(file_path)
+            self._latest_ingested_file_update = file_modified_date
         else:
-            self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
 
         await self._save_latest_timestamp()
 
@@ -73,9 +74,10 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()):
+        file_modified_date = os.path.getmtime(file_path)
+        if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path):
+        elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -114,15 +116,14 @@ class IngestionHistory(ABC):
         pass
 
     @staticmethod
-    def _in_time_range(file, date_from: datetime = None, date_to: datetime = None):
+    def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None):
         """
         :param file: file path as a string
         :param date_from: timestamp, can be None
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and ts_to. False otherwise
         """
-        file_modified_time = os.path.getmtime(file)
-        is_after_from = date_from.timestamp() < file_modified_time if date_from else True
-        is_before_to = date_to.timestamp() > file_modified_time if date_to else True
+        is_after_from = start_date.timestamp() < date if start_date else True
+        is_before_to = end_date.timestamp() > date if end_date else True
 
         return is_after_from and is_before_to