You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/07 00:28:19 UTC

[incubator-sdap-ingester] branch s3-support updated: wip

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/s3-support by this push:
     new 53180f2  wip
53180f2 is described below

commit 53180f2ffc98105032641a5d177aac6959a9c2c1
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 6 17:20:36 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 16 +++++++-----
 .../services/CollectionWatcher.py                  | 29 ++++++++++++++++------
 .../collection_manager/services/S3Observer.py      |  6 ++---
 3 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 0feba0e..b49ecbb 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,4 +1,5 @@
 import os
+from urllib.parse import urlparse
 from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
@@ -46,10 +47,13 @@ class Collection:
             return os.path.dirname(self.path)
 
     def owns_file(self, file_path: str) -> bool:
-        if os.path.isdir(file_path):
-            raise IsADirectoryError()
-
-        if os.path.isdir(self.path):
-            return os.path.dirname(file_path) == self.path
+        if urlparse(file_path).scheme == 's3':
+            return file_path.find(self.path) == 0
         else:
-            return fnmatch(file_path, self.path)
+            if os.path.isdir(file_path):
+                raise IsADirectoryError()
+
+            if os.path.isdir(self.path):
+                return os.path.dirname(file_path) == self.path
+            else:
+                return fnmatch(file_path, self.path)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 499a17d..215e80e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+from collection_manager.services.S3Observer import S3Observer
 import logging
 import os
 import time
@@ -8,10 +9,12 @@ from typing import Awaitable, Callable, Dict, Optional, Set
 
 import yaml
 from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import (
-    CollectionConfigFileNotFoundError, CollectionConfigParsingError,
-    ConflictingPathCollectionError, MissingValueCollectionError,
-    RelativePathCollectionError, RelativePathError)
+from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
+                                                    CollectionConfigParsingError,
+                                                    ConflictingPathCollectionError,
+                                                    MissingValueCollectionError,
+                                                    RelativePathCollectionError,
+                                                    RelativePathError)
 from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
@@ -23,6 +26,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
+                 s3: bool = False,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -32,7 +36,11 @@ class CollectionWatcher:
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-        self._observer = Observer()
+
+        if s3:
+            self._observer = S3Observer('nexus-ingest')
+        else:
+            self._observer = Observer()
 
         self._granule_watches = set()
 
@@ -47,7 +55,11 @@ class CollectionWatcher:
         await self._run_periodically(loop=loop,
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
-        self._observer.start()
+
+        if type(self._observer) == S3Observer:
+            await self._observer.start()
+        else:
+            self._observer.start()
 
     def _collections(self) -> Set[Collection]:
         """
@@ -130,7 +142,10 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
+                if type(self._observer) == S3Observer:
+                    self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+                else:
+                    self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
             except (FileNotFoundError, NotADirectoryError):
                 bad_collection_names = ' and '.join([col.dataset_id for col in collections])
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 9a86d1e..7720432 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -48,7 +48,7 @@ class S3Observer:
     def unschedule(self, watch: S3Watch):
         self._watches.remove(watch)
 
-    def schedule(self, path: str, event_handler):
+    def schedule(self,event_handler, path: str):
         watch = S3Watch(path=path, event_handler=event_handler)
         self._watches.add(watch)
         return watch
@@ -116,8 +116,8 @@ class S3Observer:
 async def test():
     observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
     handler = Handler()
-    observer.schedule('avhrr/2012', handler)
-    observer.schedule('avhrr/2013', handler)
+    observer.schedule(handler, 'avhrr/2012')
+    observer.schedule(handler, 'avhrr/2013')
 
     await observer.start()