You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/11/04 01:11:36 UTC

[incubator-sdap-ingester] 07/15: Properly scans S3, still needs S3 signature fun

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 883dddf75350a08953d4fb16715b4e404ece6d8a
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 22 16:38:08 2020 -0700

    Properly scans S3, still needs S3 signature fun
---
 .../services/CollectionProcessor.py                |  7 ++---
 .../services/CollectionWatcher.py                  | 14 +++++++---
 .../collection_manager/services/S3Observer.py      | 18 ++++++++-----
 .../services/history_manager/IngestionHistory.py   | 31 ++++++++++++++--------
 .../tests/services/test_S3Observer.py              |  8 ++++++
 5 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index ab8ce95..89d413b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -2,6 +2,7 @@ import logging
 import os.path
 from glob import glob
 from typing import Dict
+from datetime import datetime
 
 import yaml
 from collection_manager.entities import Collection
@@ -23,7 +24,7 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_granule(self, granule: str, collection: Collection):
+    async def process_granule(self, granule: str, modified_time: datetime, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -34,7 +35,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, modified_time, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -54,7 +55,7 @@ class CollectionProcessor:
 
         dataset_config = self._generate_ingestion_message(granule, collection)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        await history_manager.push(granule)
+        await history_manager.push(granule, modified_time)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 87b1ac3..f885b1c 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,6 +1,7 @@
 import asyncio
+from datetime import datetime
 from collection_manager.entities.Collection import CollectionStorageType
-from collection_manager.services.S3Observer import S3Observer
+from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
@@ -39,7 +40,7 @@ class CollectionWatcher:
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
 
         if s3:
-            self._observer = S3Observer('nexus-ingest')
+            self._observer = S3Observer('nexus-ingest', initial_scan=True)
         else:
             self._observer = Observer()
 
@@ -197,9 +198,14 @@ class _GranuleEventHandler(FileSystemEventHandler):
         self._handle_event(event)
 
     def _handle_event(self, event):
+        path = event.src_path
         for collection in self._collections_for_dir:
             try:
-                if collection.owns_file(event.src_path):
-                    self._loop.create_task(self._callback(event.src_path, collection))
+                if collection.owns_file(path):
+                    if isinstance(event, S3Event):
+                        modified_time = event.modified_time
+                    else:
+                        modified_time = os.path.getmtime(path)
+                    self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 376a907..d204890 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -1,4 +1,5 @@
 import asyncio
+from urllib.parse import urlparse
 import datetime
 import os
 import time
@@ -14,6 +15,7 @@ os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
 @dataclass
 class S3Event:
     src_path: str
+    modified_time: datetime.datetime
 
 
 class S3FileModifiedEvent(S3Event):
@@ -48,7 +50,7 @@ class S3Observer:
     def unschedule(self, watch: S3Watch):
         self._watches.remove(watch)
 
-    def schedule(self,event_handler, path: str):
+    def schedule(self, event_handler, path: str):
         watch = S3Watch(path=path, event_handler=event_handler)
         self._watches.add(watch)
         return watch
@@ -90,9 +92,9 @@ class S3Observer:
                 file_is_new = file not in self._cache
 
                 if file_is_new:
-                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file, modified_time=modified_date))
                 else:
-                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, modified_time=modified_date))
 
         self._cache = new_cache
         self._has_polled = True
@@ -104,9 +106,9 @@ class S3Observer:
         async with aioboto3.resource("s3") as s3:
             bucket = await s3.Bucket(self._bucket)
 
-            # we need the key without the bucket name
-            async for file in bucket.objects.filter(Prefix=path):
-                new_cache[file.key] = await file.last_modified
+            object_key = S3Observer._get_object_key(path)
+            async for file in bucket.objects.filter(Prefix=object_key):
+                new_cache[f"s3://{file.bucket_name}/{file.key}"] = await file.last_modified
         end = time.perf_counter()
         duration = end - start
 
@@ -114,6 +116,10 @@ class S3Observer:
 
         return new_cache
 
+    def _get_object_key(full_path: str):
+        key = urlparse(full_path).path.strip("/")
+        return key
+
 
 async def test():
     observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index 231d179..ea50ffb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -1,4 +1,5 @@
 import hashlib
+from urllib.parse import urlparse
 import logging
 import os
 from abc import ABC, abstractmethod
@@ -6,6 +7,8 @@ from datetime import datetime
 from enum import Enum
 from typing import Optional
 
+from botocore.compat import filter_ssl_warnings
+
 logger = logging.getLogger(__name__)
 
 BLOCK_SIZE = 65536
@@ -37,27 +40,26 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    async def push(self, file_path: str):
+    async def push(self, file_path: str, modified_time: datetime):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
+        file_name = IngestionHistory._get_standardized_path(file_path)
         signature = self._signature_fun(file_path)
         await self._push_record(file_name, signature)
 
-        file_modified_date = os.path.getmtime(file_path)
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = file_modified_date
+            self._latest_ingested_file_update = modified_time 
         else:
-            self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time)
 
         await self._save_latest_timestamp()
 
     async def get_granule_status(self,
                                  file_path: str,
+                                 modified_time: datetime,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -74,14 +76,22 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        file_modified_date = os.path.getmtime(file_path)
-        if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
+        if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
+        elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
+    def _get_standardized_path(file_path: str):
+        file_path = file_path.strip()
+        # TODO: Why do we need to record the basename of the path, instead of just the full path?
+        # The only reason this is here right now is for backwards compatibility to avoid triggering a full reingestion.
+        if urlparse(file_path).scheme == 's3':
+            return urlparse(file_path).path.strip("/")
+        else:
+            return os.path.basename(file_path)
+
     def _latest_ingested_mtime(self) -> Optional[datetime]:
         """
         Return the modified time of the most recently modified file that was ingested.
@@ -98,8 +108,7 @@ class IngestionHistory(ABC):
         :param file_path: The full path of a file to search for in the history.
         :return: A boolean indicating whether this file has already been ingested or not
         """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
+        file_name = IngestionHistory._get_standardized_path(file_path)
         signature = self._signature_fun(file_path)
         return signature == await self._get_signature(file_name)
 
diff --git a/collection_manager/tests/services/test_S3Observer.py b/collection_manager/tests/services/test_S3Observer.py
new file mode 100644
index 0000000..3fa49e0
--- /dev/null
+++ b/collection_manager/tests/services/test_S3Observer.py
@@ -0,0 +1,8 @@
+from collection_manager.services import S3Observer
+import unittest
+
+
+class TestS3Observer(unittest.TestCase):
+
+    def test_get_object_key(self):
+        self.assertEqual('test_dir/object.nc', S3Observer._get_object_key('s3://test-bucket/test_dir/object.nc'))