You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/21 23:42:43 UTC

[incubator-sdap-ingester] branch s3-support updated (7251626 -> c56cd91)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


 discard 7251626  wip
 discard a20e85d  Collection.py should support s3 path schemes
 discard 53180f2  wip
 discard 000d93d  Create S3Observer
 discard 6bca5b8  Move directory scanning out of Collection class
 discard 38b254c  Make some public methods private, for clarity
     add 2c4c8bb  SDAP-291: Fix netcdf parsing error by freezing pandas at 1.0.4 (#20)
     new f71f6df  Make some public methods private, for clarity
     new 3c39c30  Move directory scanning out of Collection class
     new e26891a  Create S3Observer
     new 7adb71c  wip
     new 0f51ebb  Collection.py should support s3 path schemes
     new c56cd91  wip

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7251626)
            \
             N -- N -- N   refs/heads/s3-support (c56cd91)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 granule_ingester/conda-requirements.txt | 1 +
 1 file changed, 1 insertion(+)


[incubator-sdap-ingester] 01/06: Make some public methods private, for clarity

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit f71f6dfa33ee370b6a0b8cf12fdf49934ba59235
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 1 18:20:22 2020 -0700

    Make some public methods private, for clarity
---
 .../services/history_manager/IngestionHistory.py   | 46 +++++++++++-----------
 .../history_manager/test_FileIngestionHistory.py   |  6 +--
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index ef73ccb..b71c32f 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -55,27 +55,6 @@ class IngestionHistory(ABC):
 
         await self._save_latest_timestamp()
 
-    def latest_ingested_mtime(self) -> Optional[datetime]:
-        """
-        Return the modified time of the most recently modified file that was ingested.
-        :return: A datetime or None
-        """
-        if self._latest_ingested_file_update:
-            return datetime.fromtimestamp(self._latest_ingested_file_update)
-        else:
-            return None
-
-    async def already_ingested(self, file_path: str) -> bool:
-        """
-        Return a boolean indicating whether the specified file has already been ingested, based on its signature.
-        :param file_path: The full path of a file to search for in the history.
-        :return: A boolean indicating whether this file has already been ingested or not
-        """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
-        signature = self._signature_fun(file_path)
-        return signature == await self._get_signature(file_name)
-
     async def get_granule_status(self,
                                  file_path: str,
                                  date_from: datetime = None,
@@ -94,13 +73,34 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
+        if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
+    def _latest_ingested_mtime(self) -> Optional[datetime]:
+        """
+        Return the modified time of the most recently modified file that was ingested.
+        :return: A datetime or None
+        """
+        if self._latest_ingested_file_update:
+            return datetime.fromtimestamp(self._latest_ingested_file_update)
+        else:
+            return None
+
+    async def _already_ingested(self, file_path: str) -> bool:
+        """
+        Return a boolean indicating whether the specified file has already been ingested, based on its signature.
+        :param file_path: The full path of a file to search for in the history.
+        :return: A boolean indicating whether this file has already been ingested or not
+        """
+        file_path = file_path.strip()
+        file_name = os.path.basename(file_path)
+        signature = self._signature_fun(file_path)
+        return signature == await self._get_signature(file_name)
+
     @abstractmethod
     async def _save_latest_timestamp(self):
         pass
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index 07ab0e1..8bd939e 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -36,7 +36,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
             await ingestion_history.push(str(current_file_path))
-            self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
+            self.assertTrue(await ingestion_history._already_ingested(str(current_file_path)))
 
             del ingestion_history
 
@@ -47,7 +47,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
             await ingestion_history.push(str(current_file_path))
-            self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
+            self.assertTrue(await ingestion_history._already_ingested(str(current_file_path)))
 
             del ingestion_history
 
@@ -57,7 +57,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
+            self.assertFalse(await ingestion_history._already_ingested(str(current_file_path)))
 
 
 if __name__ == '__main__':


[incubator-sdap-ingester] 06/06: wip

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit c56cd915a342f57d49912f63555fd93a15093099
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Oct 21 16:41:59 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 12 ++++++++
 .../collection_manager/entities/__init__.py        |  1 +
 collection_manager/collection_manager/main.py      |  3 +-
 .../services/CollectionWatcher.py                  | 33 ++++++++++++----------
 .../collection_manager/services/S3Observer.py      |  2 ++
 collection_manager/requirements.txt                |  3 +-
 6 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index aa700cd..7a45b66 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -5,10 +5,16 @@ from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
 from typing import List, Optional
+from enum import Enum
 
 from collection_manager.entities.exceptions import MissingValueCollectionError
 
 
+class CollectionStorageType(Enum):
+    LOCAL = 1
+    S3 = 2
+
+
 @dataclass(frozen=True)
 class Collection:
     dataset_id: str
@@ -40,6 +46,12 @@ class Collection:
         except KeyError as e:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
+    def storage_type(self):
+        if urlparse(self.path).scheme == 's3':
+            return CollectionStorageType.S3
+        else:
+            return CollectionStorageType.LOCAL
+
     def directory(self):
         if urlparse(self.path).scheme == 's3':
             return self.path
diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py
index 165341b..b9c7a25 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1 +1,2 @@
 from .Collection import Collection
+from .Collection import CollectionStorageType
\ No newline at end of file
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3de4fdd..044cb87 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -71,7 +71,8 @@ async def main():
                                                        history_manager_builder=history_manager_builder)
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
-                                                   collections_refresh_interval=int(options.refresh))
+                                                   collections_refresh_interval=int(options.refresh),
+                                                   s3=True)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 215e80e..87b1ac3 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+from collection_manager.entities.Collection import CollectionStorageType
 from collection_manager.services.S3Observer import S3Observer
 import logging
 import os
@@ -69,11 +70,15 @@ class CollectionWatcher:
         return {collection for collections in self._collections_by_dir.values() for collection in collections}
 
     def _validate_collection(self, collection: Collection):
-        directory = collection.directory()
-        if not os.path.isabs(directory):
-            raise RelativePathCollectionError(collection=collection)
-        if directory == os.path.dirname(self._collections_path):
-            raise ConflictingPathCollectionError(collection=collection)
+        if collection.storage_type() == CollectionStorageType.S3:
+            # do some S3 path validation here
+            return
+        else:
+            directory = collection.directory()
+            if not os.path.isabs(directory):
+                raise RelativePathCollectionError(collection=collection)
+            if directory == os.path.dirname(self._collections_path):
+                raise ConflictingPathCollectionError(collection=collection)
 
     def _load_collections(self):
         try:
@@ -185,18 +190,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_created(self, event):
         super().on_created(event)
-        for collection in self._collections_for_dir:
-            try:
-                if collection.owns_file(event.src_path):
-                    self._loop.create_task(self._callback(event.src_path, collection))
-            except IsADirectoryError:
-                pass
+        self._handle_event(event)
 
     def on_modified(self, event):
         super().on_modified(event)
-        # if os.path.isdir(event.src_path):
-        #     return
-        if type(event) == FileModifiedEvent:
-            for collection in self._collections_for_dir:
+        self._handle_event(event)
+
+    def _handle_event(self, event):
+        for collection in self._collections_for_dir:
+            try:
                 if collection.owns_file(event.src_path):
                     self._loop.create_task(self._callback(event.src_path, collection))
+            except IsADirectoryError:
+                return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 7720432..376a907 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -103,6 +103,8 @@ class S3Observer:
         start = time.perf_counter()
         async with aioboto3.resource("s3") as s3:
             bucket = await s3.Bucket(self._bucket)
+
+            # we need the key without the bucket name
             async for file in bucket.objects.filter(Prefix=path):
                 new_cache[file.key] = await file.last_modified
         end = time.perf_counter()
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 34f1334..3402a73 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -5,4 +5,5 @@ watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
 tenacity==6.2.0
-aioboto3==8.0.5
\ No newline at end of file
+aioboto3==8.0.5
+aiohttp==3.6.2
\ No newline at end of file


[incubator-sdap-ingester] 05/06: Collection.py should support s3 path schemes

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 0f51ebb12242492853d7a77164a977e0656d839f
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 8 11:20:33 2020 -0700

    Collection.py should support s3 path schemes
---
 collection_manager/collection_manager/entities/Collection.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index b49ecbb..aa700cd 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -41,7 +41,9 @@ class Collection:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
     def directory(self):
-        if os.path.isdir(self.path):
+        if urlparse(self.path).scheme == 's3':
+            return self.path
+        elif os.path.isdir(self.path):
             return self.path
         else:
             return os.path.dirname(self.path)


[incubator-sdap-ingester] 04/06: wip

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 7adb71c093bcdb458ac313e6eadde8ce0360fc32
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 6 17:20:36 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 16 +++++++-----
 .../services/CollectionWatcher.py                  | 29 ++++++++++++++++------
 .../collection_manager/services/S3Observer.py      |  6 ++---
 3 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 0feba0e..b49ecbb 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,4 +1,5 @@
 import os
+from urllib.parse import urlparse
 from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
@@ -46,10 +47,13 @@ class Collection:
             return os.path.dirname(self.path)
 
     def owns_file(self, file_path: str) -> bool:
-        if os.path.isdir(file_path):
-            raise IsADirectoryError()
-
-        if os.path.isdir(self.path):
-            return os.path.dirname(file_path) == self.path
+        if urlparse(file_path).scheme == 's3':
+            return file_path.find(self.path) == 0
         else:
-            return fnmatch(file_path, self.path)
+            if os.path.isdir(file_path):
+                raise IsADirectoryError()
+
+            if os.path.isdir(self.path):
+                return os.path.dirname(file_path) == self.path
+            else:
+                return fnmatch(file_path, self.path)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 499a17d..215e80e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+from collection_manager.services.S3Observer import S3Observer
 import logging
 import os
 import time
@@ -8,10 +9,12 @@ from typing import Awaitable, Callable, Dict, Optional, Set
 
 import yaml
 from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import (
-    CollectionConfigFileNotFoundError, CollectionConfigParsingError,
-    ConflictingPathCollectionError, MissingValueCollectionError,
-    RelativePathCollectionError, RelativePathError)
+from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
+                                                    CollectionConfigParsingError,
+                                                    ConflictingPathCollectionError,
+                                                    MissingValueCollectionError,
+                                                    RelativePathCollectionError,
+                                                    RelativePathError)
 from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
@@ -23,6 +26,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
+                 s3: bool = False,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -32,7 +36,11 @@ class CollectionWatcher:
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-        self._observer = Observer()
+
+        if s3:
+            self._observer = S3Observer('nexus-ingest')
+        else:
+            self._observer = Observer()
 
         self._granule_watches = set()
 
@@ -47,7 +55,11 @@ class CollectionWatcher:
         await self._run_periodically(loop=loop,
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
-        self._observer.start()
+
+        if type(self._observer) == S3Observer:
+            await self._observer.start()
+        else:
+            self._observer.start()
 
     def _collections(self) -> Set[Collection]:
         """
@@ -130,7 +142,10 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
+                if type(self._observer) == S3Observer:
+                    self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+                else:
+                    self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
             except (FileNotFoundError, NotADirectoryError):
                 bad_collection_names = ' and '.join([col.dataset_id for col in collections])
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 9a86d1e..7720432 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -48,7 +48,7 @@ class S3Observer:
     def unschedule(self, watch: S3Watch):
         self._watches.remove(watch)
 
-    def schedule(self, path: str, event_handler):
+    def schedule(self,event_handler, path: str):
         watch = S3Watch(path=path, event_handler=event_handler)
         self._watches.add(watch)
         return watch
@@ -116,8 +116,8 @@ class S3Observer:
 async def test():
     observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
     handler = Handler()
-    observer.schedule('avhrr/2012', handler)
-    observer.schedule('avhrr/2013', handler)
+    observer.schedule(handler, 'avhrr/2012')
+    observer.schedule(handler, 'avhrr/2013')
 
     await observer.start()
 


[incubator-sdap-ingester] 03/06: Create S3Observer

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit e26891a5f7b9111909eb167f38d1f1a1ad48a0ed
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 6 13:15:23 2020 -0700

    Create S3Observer
---
 .../services/CollectionWatcher.py                  |  14 +--
 .../collection_manager/services/S3Observer.py      | 140 +++++++++++++++++++++
 .../collection_manager/services/__init__.py        |   1 +
 collection_manager/requirements.txt                |   3 +-
 4 files changed, 150 insertions(+), 8 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 20ec7c7..499a17d 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -12,7 +12,7 @@ from collection_manager.entities.exceptions import (
     CollectionConfigFileNotFoundError, CollectionConfigParsingError,
     ConflictingPathCollectionError, MissingValueCollectionError,
     RelativePathCollectionError, RelativePathError)
-from watchdog.events import FileSystemEventHandler
+from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
 logger = logging.getLogger(__name__)
@@ -179,9 +179,9 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_modified(self, event):
         super().on_modified(event)
-        if os.path.isdir(event.src_path):
-            return
-
-        for collection in self._collections_for_dir:
-            if collection.owns_file(event.src_path):
-                self._loop.create_task(self._callback(event.src_path, collection))
+        # if os.path.isdir(event.src_path):
+        #     return
+        if type(event) == FileModifiedEvent:
+            for collection in self._collections_for_dir:
+                if collection.owns_file(event.src_path):
+                    self._loop.create_task(self._callback(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
new file mode 100644
index 0000000..9a86d1e
--- /dev/null
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -0,0 +1,140 @@
+import asyncio
+import datetime
+import os
+import time
+from dataclasses import dataclass
+from typing import Set, Dict, Optional, Callable, Awaitable
+
+import aioboto3
+
+os.environ['AWS_PROFILE'] = "saml-pub"
+os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
+
+
+@dataclass
+class S3Event:
+    src_path: str
+
+
+class S3FileModifiedEvent(S3Event):
+    pass
+
+
+class S3FileCreatedEvent(S3Event):
+    pass
+
+
+class S3Watch(object):
+    def __init__(self, path: str, event_handler) -> None:
+        self.path = path
+        self.event_handler = event_handler
+
+
+class S3Observer:
+
+    def __init__(self, bucket, initial_scan=False) -> None:
+        self._bucket = bucket
+        self._cache: Dict[str, datetime.datetime] = {}
+        self._initial_scan = initial_scan
+        self._watches: Set[S3Watch] = set()
+
+        self._has_polled = False
+
+    async def start(self):
+        await self._run_periodically(loop=None,
+                                     wait_time=30,
+                                     func=self._poll)
+
+    def unschedule(self, watch: S3Watch):
+        self._watches.remove(watch)
+
+    def schedule(self, path: str, event_handler):
+        watch = S3Watch(path=path, event_handler=event_handler)
+        self._watches.add(watch)
+        return watch
+
+    @classmethod
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                func: Callable[[any], Awaitable],
+                                *args,
+                                **kwargs):
+        """
+        Call a function periodically. This uses asyncio, and is non-blocking.
+        :param loop: An optional event loop to use. If None, the current running event loop will be used.
+        :param wait_time: seconds to wait between iterations of func
+        :param func: the async function that will be awaited
+        :param args: any args that need to be provided to func
+        """
+        if loop is None:
+            loop = asyncio.get_running_loop()
+        await func(*args, **kwargs)
+        loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs))
+
+    async def _poll(self):
+        new_cache = {}
+        watch_index = {}
+
+        for watch in self._watches:
+            new_cache_for_watch = await self._get_s3_files(watch.path)
+            new_index = {file: watch for file in new_cache_for_watch}
+
+            new_cache = {**new_cache, **new_cache_for_watch}
+            watch_index = {**watch_index, **new_index}
+        difference = set(new_cache.items()) - set(self._cache.items())
+
+        if self._has_polled or self._initial_scan:
+            for (file, modified_date) in difference:
+                watch = watch_index[file]
+                file_is_new = file not in self._cache
+
+                if file_is_new:
+                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+                else:
+                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+
+        self._cache = new_cache
+        self._has_polled = True
+
+    async def _get_s3_files(self, path: str):
+        new_cache = {}
+
+        start = time.perf_counter()
+        async with aioboto3.resource("s3") as s3:
+            bucket = await s3.Bucket(self._bucket)
+            async for file in bucket.objects.filter(Prefix=path):
+                new_cache[file.key] = await file.last_modified
+        end = time.perf_counter()
+        duration = end - start
+
+        print(f"Retrieved {len(new_cache)} objects in {duration}")
+
+        return new_cache
+
+
+async def test():
+    observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
+    handler = Handler()
+    observer.schedule('avhrr/2012', handler)
+    observer.schedule('avhrr/2013', handler)
+
+    await observer.start()
+
+    while True:
+        try:
+            await asyncio.sleep(1)
+        except KeyboardInterrupt:
+            return
+
+
+class Handler:
+    def on_created(self, event: S3Event):
+        print(f"File created: {event.src_path}")
+
+    def on_modified(self, event: S3Event):
+        print(f"File modified: {event.src_path}")
+
+
+if __name__ == "__main__":
+    asyncio.run(test())
diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py
index 635d3dc..553e1b7 100644
--- a/collection_manager/collection_manager/services/__init__.py
+++ b/collection_manager/collection_manager/services/__init__.py
@@ -16,3 +16,4 @@
 from .CollectionProcessor import CollectionProcessor
 from .CollectionWatcher import CollectionWatcher
 from .MessagePublisher import MessagePublisher
+from .S3Observer import S3Observer
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index ee12c89..34f1334 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,4 +4,5 @@ pysolr==3.9.0
 watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
-tenacity==6.2.0
\ No newline at end of file
+tenacity==6.2.0
+aioboto3==8.0.5
\ No newline at end of file


[incubator-sdap-ingester] 02/06: Move directory scanning out of Collection class

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 3c39c301ad69f0654b3194de9a2d917277b8d123
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 2 13:33:46 2020 -0700

    Move directory scanning out of Collection class
---
 .../collection_manager/entities/Collection.py          |  3 ---
 collection_manager/collection_manager/main.py          |  1 -
 .../collection_manager/services/CollectionProcessor.py | 18 ++++++------------
 .../collection_manager/services/CollectionWatcher.py   |  8 +++++---
 .../services/history_manager/IngestionHistory.py       | 17 +++++++++--------
 5 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 031a3a9..0feba0e 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -53,6 +53,3 @@ class Collection:
             return os.path.dirname(file_path) == self.path
         else:
             return fnmatch(file_path, self.path)
-
-    def files_owned(self) -> List[str]:
-        return glob(self.path, recursive=True)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index cbe22f9..3de4fdd 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -70,7 +70,6 @@ async def main():
             collection_processor = CollectionProcessor(message_publisher=publisher,
                                                        history_manager_builder=history_manager_builder)
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
-                                                   collection_updated_callback=collection_processor.process_collection,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh))
 
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index f08ade9..eb3bbae 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,12 +1,15 @@
 import logging
 import os.path
+from glob import glob
 from typing import Dict
-import yaml
 
+import yaml
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
-from collection_manager.services.history_manager import IngestionHistory, GranuleStatus
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
+from collection_manager.services.history_manager import (GranuleStatus,
+                                                         IngestionHistory)
+from collection_manager.services.history_manager.IngestionHistory import \
+    IngestionHistoryBuilder
 
 logger = logging.getLogger(__name__)
 
@@ -20,15 +23,6 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_collection(self, collection: Collection):
-        """
-        Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
-        :param collection: A Collection definition
-        :return: None
-        """
-        for granule in collection.files_owned():
-            await self.process_granule(granule, collection)
-
     async def process_granule(self, granule: str, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1c7c1be..20ec7c7 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -3,6 +3,7 @@ import logging
 import os
 import time
 from collections import defaultdict
+from glob import glob
 from typing import Awaitable, Callable, Dict, Optional, Set
 
 import yaml
@@ -21,14 +22,12 @@ logger.setLevel(logging.DEBUG)
 class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
-                 collection_updated_callback: Callable[[Collection], Awaitable],
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
 
         self._collections_path = collections_path
-        self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
         self._collections_refresh_interval = collections_refresh_interval
 
@@ -107,7 +106,10 @@ class CollectionWatcher:
                 logger.info(f"Scanning files for {len(updated_collections)} collections...")
                 start = time.perf_counter()
                 for collection in updated_collections:
-                    await self._collection_updated_callback(collection)
+                    files_owned = glob(collection.path, recursive=True)
+                    for granule in files_owned:
+                        await self._granule_updated_callback(granule, collection)
+
                 logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
 
                 self._unschedule_watches()
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index b71c32f..231d179 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -48,10 +48,11 @@ class IngestionHistory(ABC):
         signature = self._signature_fun(file_path)
         await self._push_record(file_name, signature)
 
+        file_modified_date = os.path.getmtime(file_path)
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = os.path.getmtime(file_path)
+            self._latest_ingested_file_update = file_modified_date
         else:
-            self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
 
         await self._save_latest_timestamp()
 
@@ -73,9 +74,10 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()):
+        file_modified_date = os.path.getmtime(file_path)
+        if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path):
+        elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -114,15 +116,14 @@ class IngestionHistory(ABC):
         pass
 
     @staticmethod
-    def _in_time_range(file, date_from: datetime = None, date_to: datetime = None):
+    def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None):
         """
         :param file: file path as a string
         :param date_from: timestamp, can be None
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and ts_to. False otherwise
         """
-        file_modified_time = os.path.getmtime(file)
-        is_after_from = date_from.timestamp() < file_modified_time if date_from else True
-        is_before_to = date_to.timestamp() > file_modified_time if date_to else True
+        is_after_from = start_date.timestamp() < date if start_date else True
+        is_before_to = end_date.timestamp() > date if end_date else True
 
         return is_after_from and is_before_to