You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/07 00:28:19 UTC
[incubator-sdap-ingester] branch s3-support updated: wip
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/s3-support by this push:
new 53180f2 wip
53180f2 is described below
commit 53180f2ffc98105032641a5d177aac6959a9c2c1
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 6 17:20:36 2020 -0700
wip
---
.../collection_manager/entities/Collection.py | 16 +++++++-----
.../services/CollectionWatcher.py | 29 ++++++++++++++++------
.../collection_manager/services/S3Observer.py | 6 ++---
3 files changed, 35 insertions(+), 16 deletions(-)
diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 0feba0e..b49ecbb 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,4 +1,5 @@
import os
+from urllib.parse import urlparse
from dataclasses import dataclass
from datetime import datetime
from fnmatch import fnmatch
@@ -46,10 +47,13 @@ class Collection:
return os.path.dirname(self.path)
def owns_file(self, file_path: str) -> bool:
- if os.path.isdir(file_path):
- raise IsADirectoryError()
-
- if os.path.isdir(self.path):
- return os.path.dirname(file_path) == self.path
+ if urlparse(file_path).scheme == 's3':
+ return file_path.find(self.path) == 0
else:
- return fnmatch(file_path, self.path)
+ if os.path.isdir(file_path):
+ raise IsADirectoryError()
+
+ if os.path.isdir(self.path):
+ return os.path.dirname(file_path) == self.path
+ else:
+ return fnmatch(file_path, self.path)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 499a17d..215e80e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
import asyncio
+from collection_manager.services.S3Observer import S3Observer
import logging
import os
import time
@@ -8,10 +9,12 @@ from typing import Awaitable, Callable, Dict, Optional, Set
import yaml
from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import (
- CollectionConfigFileNotFoundError, CollectionConfigParsingError,
- ConflictingPathCollectionError, MissingValueCollectionError,
- RelativePathCollectionError, RelativePathError)
+from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
+ CollectionConfigParsingError,
+ ConflictingPathCollectionError,
+ MissingValueCollectionError,
+ RelativePathCollectionError,
+ RelativePathError)
from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
from watchdog.observers.polling import PollingObserver as Observer
@@ -23,6 +26,7 @@ class CollectionWatcher:
def __init__(self,
collections_path: str,
granule_updated_callback: Callable[[str, Collection], Awaitable],
+ s3: bool = False,
collections_refresh_interval: float = 30):
if not os.path.isabs(collections_path):
raise RelativePathError("Collections config path must be an absolute path.")
@@ -32,7 +36,11 @@ class CollectionWatcher:
self._collections_refresh_interval = collections_refresh_interval
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
- self._observer = Observer()
+
+ if s3:
+ self._observer = S3Observer('nexus-ingest')
+ else:
+ self._observer = Observer()
self._granule_watches = set()
@@ -47,7 +55,11 @@ class CollectionWatcher:
await self._run_periodically(loop=loop,
wait_time=self._collections_refresh_interval,
func=self._reload_and_reschedule)
- self._observer.start()
+
+ if type(self._observer) == S3Observer:
+ await self._observer.start()
+ else:
+ self._observer.start()
def _collections(self) -> Set[Collection]:
"""
@@ -130,7 +142,10 @@ class CollectionWatcher:
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
try:
- self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
+ if type(self._observer) == S3Observer:
+ self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+ else:
+ self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
except (FileNotFoundError, NotADirectoryError):
bad_collection_names = ' and '.join([col.dataset_id for col in collections])
logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 9a86d1e..7720432 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -48,7 +48,7 @@ class S3Observer:
def unschedule(self, watch: S3Watch):
self._watches.remove(watch)
- def schedule(self, path: str, event_handler):
+ def schedule(self,event_handler, path: str):
watch = S3Watch(path=path, event_handler=event_handler)
self._watches.add(watch)
return watch
@@ -116,8 +116,8 @@ class S3Observer:
async def test():
observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
handler = Handler()
- observer.schedule('avhrr/2012', handler)
- observer.schedule('avhrr/2013', handler)
+ observer.schedule(handler, 'avhrr/2012')
+ observer.schedule(handler, 'avhrr/2013')
await observer.start()