You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/11/04 01:11:36 UTC
[incubator-sdap-ingester] 07/15: Properly scans S3,
still needs S3 signature fun
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 883dddf75350a08953d4fb16715b4e404ece6d8a
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 22 16:38:08 2020 -0700
Properly scans S3, still needs S3 signature fun
---
.../services/CollectionProcessor.py | 7 ++---
.../services/CollectionWatcher.py | 14 +++++++---
.../collection_manager/services/S3Observer.py | 18 ++++++++-----
.../services/history_manager/IngestionHistory.py | 31 ++++++++++++++--------
.../tests/services/test_S3Observer.py | 8 ++++++
5 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index ab8ce95..89d413b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -2,6 +2,7 @@ import logging
import os.path
from glob import glob
from typing import Dict
+from datetime import datetime
import yaml
from collection_manager.entities import Collection
@@ -23,7 +24,7 @@ class CollectionProcessor:
self._history_manager_builder = history_manager_builder
self._history_manager_cache: Dict[str, IngestionHistory] = {}
- async def process_granule(self, granule: str, collection: Collection):
+ async def process_granule(self, granule: str, modified_time: datetime, collection: Collection):
"""
Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
:param granule: A path to a granule file
@@ -34,7 +35,7 @@ class CollectionProcessor:
return
history_manager = self._get_history_manager(collection.dataset_id)
- granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+ granule_status = await history_manager.get_granule_status(granule, modified_time, collection.date_from, collection.date_to)
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -54,7 +55,7 @@ class CollectionProcessor:
dataset_config = self._generate_ingestion_message(granule, collection)
await self._publisher.publish_message(body=dataset_config, priority=use_priority)
- await history_manager.push(granule)
+ await history_manager.push(granule, modified_time)
@staticmethod
def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 87b1ac3..f885b1c 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,6 +1,7 @@
import asyncio
+from datetime import datetime
from collection_manager.entities.Collection import CollectionStorageType
-from collection_manager.services.S3Observer import S3Observer
+from collection_manager.services.S3Observer import S3Event, S3Observer
import logging
import os
import time
@@ -39,7 +40,7 @@ class CollectionWatcher:
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
if s3:
- self._observer = S3Observer('nexus-ingest')
+ self._observer = S3Observer('nexus-ingest', initial_scan=True)
else:
self._observer = Observer()
@@ -197,9 +198,14 @@ class _GranuleEventHandler(FileSystemEventHandler):
self._handle_event(event)
def _handle_event(self, event):
+ path = event.src_path
for collection in self._collections_for_dir:
try:
- if collection.owns_file(event.src_path):
- self._loop.create_task(self._callback(event.src_path, collection))
+ if collection.owns_file(path):
+ if isinstance(event, S3Event):
+ modified_time = event.modified_time
+ else:
+ modified_time = os.path.getmtime(path)
+ self._loop.create_task(self._callback(path, modified_time, collection))
except IsADirectoryError:
return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 376a907..d204890 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -1,4 +1,5 @@
import asyncio
+from urllib.parse import urlparse
import datetime
import os
import time
@@ -14,6 +15,7 @@ os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
@dataclass
class S3Event:
src_path: str
+ modified_time: datetime.datetime
class S3FileModifiedEvent(S3Event):
@@ -48,7 +50,7 @@ class S3Observer:
def unschedule(self, watch: S3Watch):
self._watches.remove(watch)
- def schedule(self,event_handler, path: str):
+ def schedule(self, event_handler, path: str):
watch = S3Watch(path=path, event_handler=event_handler)
self._watches.add(watch)
return watch
@@ -90,9 +92,9 @@ class S3Observer:
file_is_new = file not in self._cache
if file_is_new:
- watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+ watch.event_handler.on_created(S3FileCreatedEvent(src_path=file, modified_time=modified_date))
else:
- watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+ watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, modified_time=modified_date))
self._cache = new_cache
self._has_polled = True
@@ -104,9 +106,9 @@ class S3Observer:
async with aioboto3.resource("s3") as s3:
bucket = await s3.Bucket(self._bucket)
- # we need the key without the bucket name
- async for file in bucket.objects.filter(Prefix=path):
- new_cache[file.key] = await file.last_modified
+ object_key = S3Observer._get_object_key(path)
+ async for file in bucket.objects.filter(Prefix=object_key):
+ new_cache[f"s3://{file.bucket_name}/{file.key}"] = await file.last_modified
end = time.perf_counter()
duration = end - start
@@ -114,6 +116,10 @@ class S3Observer:
return new_cache
+ def _get_object_key(full_path: str):
+ key = urlparse(full_path).path.strip("/")
+ return key
+
async def test():
observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index 231d179..ea50ffb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -1,4 +1,5 @@
import hashlib
+from urllib.parse import urlparse
import logging
import os
from abc import ABC, abstractmethod
@@ -6,6 +7,8 @@ from datetime import datetime
from enum import Enum
from typing import Optional
+from botocore.compat import filter_ssl_warnings
+
logger = logging.getLogger(__name__)
BLOCK_SIZE = 65536
@@ -37,27 +40,26 @@ class IngestionHistory(ABC):
_signature_fun = None
_latest_ingested_file_update = None
- async def push(self, file_path: str):
+ async def push(self, file_path: str, modified_time: datetime):
"""
Record a file as having been ingested.
:param file_path: The full path to the file to record.
:return: None
"""
- file_path = file_path.strip()
- file_name = os.path.basename(file_path)
+ file_name = IngestionHistory._get_standardized_path(file_path)
signature = self._signature_fun(file_path)
await self._push_record(file_name, signature)
- file_modified_date = os.path.getmtime(file_path)
if not self._latest_ingested_file_update:
- self._latest_ingested_file_update = file_modified_date
+ self._latest_ingested_file_update = modified_time
else:
- self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
+ self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time)
await self._save_latest_timestamp()
async def get_granule_status(self,
file_path: str,
+ modified_time: datetime,
date_from: datetime = None,
date_to: datetime = None) -> GranuleStatus:
"""
@@ -74,14 +76,22 @@ class IngestionHistory(ABC):
should fall in order to be "desired".
:return: A GranuleStatus enum.
"""
- file_modified_date = os.path.getmtime(file_path)
- if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
+ if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()):
return GranuleStatus.DESIRED_FORWARD_PROCESSING
- elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
+ elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path):
return GranuleStatus.DESIRED_HISTORICAL
else:
return GranuleStatus.UNDESIRED
+ def _get_standardized_path(file_path: str):
+ file_path = file_path.strip()
+ # TODO: Why do we need to record the basename of the path, instead of just the full path?
+ # The only reason this is here right now is for backwards compatibility to avoid triggering a full reingestion.
+ if urlparse(file_path).scheme == 's3':
+ return urlparse(file_path).path.strip("/")
+ else:
+ return os.path.basename(file_path)
+
def _latest_ingested_mtime(self) -> Optional[datetime]:
"""
Return the modified time of the most recently modified file that was ingested.
@@ -98,8 +108,7 @@ class IngestionHistory(ABC):
:param file_path: The full path of a file to search for in the history.
:return: A boolean indicating whether this file has already been ingested or not
"""
- file_path = file_path.strip()
- file_name = os.path.basename(file_path)
+ file_name = IngestionHistory._get_standardized_path(file_path)
signature = self._signature_fun(file_path)
return signature == await self._get_signature(file_name)
diff --git a/collection_manager/tests/services/test_S3Observer.py b/collection_manager/tests/services/test_S3Observer.py
new file mode 100644
index 0000000..3fa49e0
--- /dev/null
+++ b/collection_manager/tests/services/test_S3Observer.py
@@ -0,0 +1,8 @@
+from collection_manager.services import S3Observer
+import unittest
+
+
+class TestS3Observer(unittest.TestCase):
+
+ def test_get_object_key(self):
+ self.assertEqual('test_dir/object.nc', S3Observer._get_object_key('s3://test-bucket/test_dir/object.nc'))