You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/11/04 01:11:29 UTC

[incubator-sdap-ingester] branch s3-support updated (48f9228 -> 5c66515)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a change to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


 discard 48f9228  Fix boto3 version requirements
 discard 7046617  requirements
 discard 386e79f  Add S3_BUCKET to docker entrypoint
 discard d97f5cb  Pass s3-bucket at startup
 discard bb01225  no need to hardcode aws creds
 discard 4677af8  it works
 discard 100a9bb  fixed scanning weirdness
 discard 200046c  fix signature_fun for s3
 discard 192f01c  Properly scans S3, still needs S3 signature fun
 discard 2eb7aaf  wip
 discard 96caa70  Collection.py should support s3 path schemes
 discard 6afe67f  wip
 discard b431c11  Create S3Observer
 discard d412f90  Move directory scanning out of Collection class
 discard 180ea1e  Make some public methods private, for clarity
     add 72e0f32  SDAP-286: Add processor module to Granule Ingester to properly handle longitudes between 180-360deg (#23)
     new de64c8b  Make some public methods private, for clarity
     new 171225e  Move directory scanning out of Collection class
     new 16a636e  Create S3Observer
     new c5bed69  wip
     new d3fe631  Collection.py should support s3 path schemes
     new 069d9fc  wip
     new 883dddf  Properly scans S3, still needs S3 signature fun
     new 1a15971  fix signature_fun for s3
     new a7e3488  fixed scanning weirdness
     new b87d30e  it works
     new dc519ab  no need to hardcode aws creds
     new 0976c6d  Pass s3-bucket at startup
     new 74afad3  Add S3_BUCKET to docker entrypoint
     new b342b3d  requirements
     new 5c66515  Fix boto3 version requirements

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (48f9228)
            \
             N -- N -- N   refs/heads/s3-support (5c66515)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../services/CollectionProcessor.py                   |  1 +
 granule_ingester/granule_ingester/pipeline/Modules.py |  9 +++++++--
 ...kelvintocelsius.py => Subtract180FromLongitude.py} | 19 +++++++++++++------
 .../granule_ingester/processors/__init__.py           |  1 +
 4 files changed, 22 insertions(+), 8 deletions(-)
 copy granule_ingester/granule_ingester/processors/{kelvintocelsius.py => Subtract180FromLongitude.py} (67%)


[incubator-sdap-ingester] 09/15: fixed scanning weirdness

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit a7e3488093552aacced4a97d67ed717c458cb126
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 16:32:57 2020 -0700

    fixed scanning weirdness
---
 .../services/CollectionWatcher.py                  | 32 ++++++++++++----------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index f885b1c..1fb6abd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,16 +1,15 @@
 import asyncio
 from datetime import datetime
-from collection_manager.entities.Collection import CollectionStorageType
+from collection_manager.entities.Collection import CollectionStorageType, Collection
 from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
 from collections import defaultdict
 from glob import glob
-from typing import Awaitable, Callable, Dict, Optional, Set
+from typing import Awaitable, Callable, Dict, List, Optional, Set
 
 import yaml
-from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
                                                     CollectionConfigParsingError,
                                                     ConflictingPathCollectionError,
@@ -58,7 +57,7 @@ class CollectionWatcher:
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
 
-        if type(self._observer) == S3Observer:
+        if isinstance(self._observer, S3Observer):
             await self._observer.start()
         else:
             self._observer.start()
@@ -117,18 +116,23 @@ class CollectionWatcher:
         self._load_collections()
         return self._collections() - old_collections
 
+    async def _call_callback_for_all_granules(self, collections: List[Collection]):
+        logger.info(f"Scanning files for {len(collections)} collections...")
+        start = time.perf_counter()
+        for collection in collections:
+            for granule_path in glob(collection.path, recursive=True):
+                modified_time = os.path.getmtime(granule_path)
+                await self._granule_updated_callback(granule_path, modified_time, collection)
+        logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             if len(updated_collections) > 0:
-                logger.info(f"Scanning files for {len(updated_collections)} collections...")
-                start = time.perf_counter()
-                for collection in updated_collections:
-                    files_owned = glob(collection.path, recursive=True)
-                    for granule in files_owned:
-                        await self._granule_updated_callback(granule, collection)
-
-                logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+                # For S3 collections, the S3Observer will report as new any files that haven't already been scanned.
+                # So we only need to rescan granules here if not using S3.
+                if not isinstance(self._observer, S3Observer):
+                    await self._call_callback_for_all_granules(collections=updated_collections)
 
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -148,7 +152,7 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                if type(self._observer) == S3Observer:
+                if isinstance(self._observer, S3Observer):
                     self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
                 else:
                     self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
@@ -205,7 +209,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
                     if isinstance(event, S3Event):
                         modified_time = event.modified_time
                     else:
-                        modified_time = os.path.getmtime(path)
+                        modified_time = datetime.fromtimestamp(os.path.getmtime(path))
                     self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return


[incubator-sdap-ingester] 06/15: wip

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 069d9fc0c6db2fc48be47b6e555d180e015751f7
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Oct 21 16:41:59 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 12 ++++++++
 .../collection_manager/entities/__init__.py        |  1 +
 collection_manager/collection_manager/main.py      |  3 +-
 .../services/CollectionWatcher.py                  | 33 ++++++++++++----------
 .../collection_manager/services/S3Observer.py      |  2 ++
 collection_manager/requirements.txt                |  3 +-
 6 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index aa700cd..7a45b66 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -5,10 +5,16 @@ from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
 from typing import List, Optional
+from enum import Enum
 
 from collection_manager.entities.exceptions import MissingValueCollectionError
 
 
+class CollectionStorageType(Enum):
+    LOCAL = 1
+    S3 = 2
+
+
 @dataclass(frozen=True)
 class Collection:
     dataset_id: str
@@ -40,6 +46,12 @@ class Collection:
         except KeyError as e:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
+    def storage_type(self):
+        if urlparse(self.path).scheme == 's3':
+            return CollectionStorageType.S3
+        else:
+            return CollectionStorageType.LOCAL
+
     def directory(self):
         if urlparse(self.path).scheme == 's3':
             return self.path
diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py
index 165341b..b9c7a25 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1 +1,2 @@
 from .Collection import Collection
+from .Collection import CollectionStorageType
\ No newline at end of file
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3de4fdd..044cb87 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -71,7 +71,8 @@ async def main():
                                                        history_manager_builder=history_manager_builder)
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
-                                                   collections_refresh_interval=int(options.refresh))
+                                                   collections_refresh_interval=int(options.refresh),
+                                                   s3=True)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 215e80e..87b1ac3 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+from collection_manager.entities.Collection import CollectionStorageType
 from collection_manager.services.S3Observer import S3Observer
 import logging
 import os
@@ -69,11 +70,15 @@ class CollectionWatcher:
         return {collection for collections in self._collections_by_dir.values() for collection in collections}
 
     def _validate_collection(self, collection: Collection):
-        directory = collection.directory()
-        if not os.path.isabs(directory):
-            raise RelativePathCollectionError(collection=collection)
-        if directory == os.path.dirname(self._collections_path):
-            raise ConflictingPathCollectionError(collection=collection)
+        if collection.storage_type() == CollectionStorageType.S3:
+            # do some S3 path validation here
+            return
+        else:
+            directory = collection.directory()
+            if not os.path.isabs(directory):
+                raise RelativePathCollectionError(collection=collection)
+            if directory == os.path.dirname(self._collections_path):
+                raise ConflictingPathCollectionError(collection=collection)
 
     def _load_collections(self):
         try:
@@ -185,18 +190,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_created(self, event):
         super().on_created(event)
-        for collection in self._collections_for_dir:
-            try:
-                if collection.owns_file(event.src_path):
-                    self._loop.create_task(self._callback(event.src_path, collection))
-            except IsADirectoryError:
-                pass
+        self._handle_event(event)
 
     def on_modified(self, event):
         super().on_modified(event)
-        # if os.path.isdir(event.src_path):
-        #     return
-        if type(event) == FileModifiedEvent:
-            for collection in self._collections_for_dir:
+        self._handle_event(event)
+
+    def _handle_event(self, event):
+        for collection in self._collections_for_dir:
+            try:
                 if collection.owns_file(event.src_path):
                     self._loop.create_task(self._callback(event.src_path, collection))
+            except IsADirectoryError:
+                return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 7720432..376a907 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -103,6 +103,8 @@ class S3Observer:
         start = time.perf_counter()
         async with aioboto3.resource("s3") as s3:
             bucket = await s3.Bucket(self._bucket)
+
+            # we need the key without the bucket name
             async for file in bucket.objects.filter(Prefix=path):
                 new_cache[file.key] = await file.last_modified
         end = time.perf_counter()
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 34f1334..3402a73 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -5,4 +5,5 @@ watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
 tenacity==6.2.0
-aioboto3==8.0.5
\ No newline at end of file
+aioboto3==8.0.5
+aiohttp==3.6.2
\ No newline at end of file


[incubator-sdap-ingester] 05/15: Collection.py should support s3 path schemes

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit d3fe631142b0408bbf049c86625b300efd7ef025
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 8 11:20:33 2020 -0700

    Collection.py should support s3 path schemes
---
 collection_manager/collection_manager/entities/Collection.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index b49ecbb..aa700cd 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -41,7 +41,9 @@ class Collection:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
     def directory(self):
-        if os.path.isdir(self.path):
+        if urlparse(self.path).scheme == 's3':
+            return self.path
+        elif os.path.isdir(self.path):
             return self.path
         else:
             return os.path.dirname(self.path)


[incubator-sdap-ingester] 10/15: it works

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit b87d30e16e88d06d65341dc225bf5ee47dcc7256
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 16:51:02 2020 -0700

    it works
---
 collection_manager/collection_manager/main.py      |  2 +-
 .../services/CollectionProcessor.py                |  7 ++++--
 .../services/CollectionWatcher.py                  |  6 +++---
 .../services/history_manager/IngestionHistory.py   | 25 +++++++++-------------
 .../history_manager/SolrIngestionHistory.py        |  2 +-
 5 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3dba6e0..782c70e 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -61,7 +61,7 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
-        ENABLE_S3 = False
+        ENABLE_S3 = True
 
         if ENABLE_S3:
             signature_fun = None
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 89d413b..96c461e 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -24,7 +24,7 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_granule(self, granule: str, modified_time: datetime, collection: Collection):
+    async def process_granule(self, granule: str, modified_time: int, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -35,7 +35,10 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = await history_manager.get_granule_status(granule, modified_time, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule,
+                                                                  modified_time,
+                                                                  collection.date_from,
+                                                                  collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1fb6abd..abd4a11 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -121,7 +121,7 @@ class CollectionWatcher:
         start = time.perf_counter()
         for collection in collections:
             for granule_path in glob(collection.path, recursive=True):
-                modified_time = os.path.getmtime(granule_path)
+                modified_time = int(os.path.getmtime(granule_path))
                 await self._granule_updated_callback(granule_path, modified_time, collection)
         logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
 
@@ -207,9 +207,9 @@ class _GranuleEventHandler(FileSystemEventHandler):
             try:
                 if collection.owns_file(path):
                     if isinstance(event, S3Event):
-                        modified_time = event.modified_time
+                        modified_time = int(event.modified_time.timestamp())
                     else:
-                        modified_time = datetime.fromtimestamp(os.path.getmtime(path))
+                        modified_time = int(os.path.getmtime(path))
                     self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d901690..7f33c79 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -38,17 +38,16 @@ class GranuleStatus(Enum):
 
 class IngestionHistory(ABC):
     _signature_fun = None
-    _latest_ingested_file_update: float = None
+    _latest_ingested_file_update: int = None
 
-    async def push(self, file_path: str, modified_datetime: datetime):
+    async def push(self, file_path: str, modified_timestamp: int):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
-        modified_timestamp = int(modified_datetime.timestamp())
         file_name = IngestionHistory._get_standardized_path(file_path)
-        signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_timestamp)
+        signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp)
         await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
@@ -60,7 +59,7 @@ class IngestionHistory(ABC):
 
     async def get_granule_status(self,
                                  file_path: str,
-                                 modified_datetime: datetime,
+                                 modified_timestamp: int,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -77,11 +76,11 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_datetime.timestamp())
+        signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp)
 
-        if self._in_time_range(modified_datetime, start_date=self._latest_ingested_mtime()):
+        if self._in_time_range(modified_timestamp, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(modified_datetime, date_from, date_to) and not await self._already_ingested(file_path, signature):
+        elif self._in_time_range(modified_timestamp, date_from, date_to) and not await self._already_ingested(file_path, signature):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -127,18 +126,14 @@ class IngestionHistory(ABC):
         pass
 
     @staticmethod
-    def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None):
+    def _in_time_range(timestamp: int, start_date: datetime = None, end_date: datetime = None):
         """
         :param file: file path as a string
         :param date_from: timestamp, can be None
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and ts_to. False otherwise
         """
-        is_after_from = start_date.timestamp() < date.timestamp() if start_date else True
-        is_before_to = end_date.timestamp() > date.timestamp() if end_date else True
+        is_after_from = int(start_date.timestamp()) < timestamp if start_date else True
+        is_before_to = int(end_date.timestamp()) > timestamp if end_date else True
 
         return is_after_from and is_before_to
-
-    @staticmethod
-    def _signature_from_timestamp(timestamp: float):
-        return str(int(timestamp))
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index ebed073..c6d26a5 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -64,7 +64,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_datasets.add([{
                 'id': self._dataset_id,
                 'dataset_s': self._dataset_id,
-                'latest_update_l': int(self._latest_ingested_file_update)}])
+                'latest_update_l': self._latest_ingested_file_update}])
             self._solr_datasets.commit()
 
     def _get_latest_file_update(self):


[incubator-sdap-ingester] 04/15: wip

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit c5bed698727dde46d591e534b829d79b7e1ec7c4
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 6 17:20:36 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 16 +++++++-----
 .../services/CollectionWatcher.py                  | 29 ++++++++++++++++------
 .../collection_manager/services/S3Observer.py      |  6 ++---
 3 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 0feba0e..b49ecbb 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,4 +1,5 @@
 import os
+from urllib.parse import urlparse
 from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
@@ -46,10 +47,13 @@ class Collection:
             return os.path.dirname(self.path)
 
     def owns_file(self, file_path: str) -> bool:
-        if os.path.isdir(file_path):
-            raise IsADirectoryError()
-
-        if os.path.isdir(self.path):
-            return os.path.dirname(file_path) == self.path
+        if urlparse(file_path).scheme == 's3':
+            return file_path.find(self.path) == 0
         else:
-            return fnmatch(file_path, self.path)
+            if os.path.isdir(file_path):
+                raise IsADirectoryError()
+
+            if os.path.isdir(self.path):
+                return os.path.dirname(file_path) == self.path
+            else:
+                return fnmatch(file_path, self.path)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 499a17d..215e80e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+from collection_manager.services.S3Observer import S3Observer
 import logging
 import os
 import time
@@ -8,10 +9,12 @@ from typing import Awaitable, Callable, Dict, Optional, Set
 
 import yaml
 from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import (
-    CollectionConfigFileNotFoundError, CollectionConfigParsingError,
-    ConflictingPathCollectionError, MissingValueCollectionError,
-    RelativePathCollectionError, RelativePathError)
+from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
+                                                    CollectionConfigParsingError,
+                                                    ConflictingPathCollectionError,
+                                                    MissingValueCollectionError,
+                                                    RelativePathCollectionError,
+                                                    RelativePathError)
 from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
@@ -23,6 +26,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
+                 s3: bool = False,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -32,7 +36,11 @@ class CollectionWatcher:
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-        self._observer = Observer()
+
+        if s3:
+            self._observer = S3Observer('nexus-ingest')
+        else:
+            self._observer = Observer()
 
         self._granule_watches = set()
 
@@ -47,7 +55,11 @@ class CollectionWatcher:
         await self._run_periodically(loop=loop,
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
-        self._observer.start()
+
+        if type(self._observer) == S3Observer:
+            await self._observer.start()
+        else:
+            self._observer.start()
 
     def _collections(self) -> Set[Collection]:
         """
@@ -130,7 +142,10 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
+                if type(self._observer) == S3Observer:
+                    self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+                else:
+                    self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
             except (FileNotFoundError, NotADirectoryError):
                 bad_collection_names = ' and '.join([col.dataset_id for col in collections])
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 9a86d1e..7720432 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -48,7 +48,7 @@ class S3Observer:
     def unschedule(self, watch: S3Watch):
         self._watches.remove(watch)
 
-    def schedule(self, path: str, event_handler):
+    def schedule(self,event_handler, path: str):
         watch = S3Watch(path=path, event_handler=event_handler)
         self._watches.add(watch)
         return watch
@@ -116,8 +116,8 @@ class S3Observer:
 async def test():
     observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
     handler = Handler()
-    observer.schedule('avhrr/2012', handler)
-    observer.schedule('avhrr/2013', handler)
+    observer.schedule(handler, 'avhrr/2012')
+    observer.schedule(handler, 'avhrr/2013')
 
     await observer.start()
 


[incubator-sdap-ingester] 15/15: Fix boto3 version requirements

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 5c66515b1d0f35fa60d2252c25a9947b9f060524
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Nov 3 16:44:07 2020 -0800

    Fix boto3 version requirements
---
 collection_manager/docker/Dockerfile    | 1 +
 granule_ingester/conda-requirements.txt | 1 -
 granule_ingester/docker/Dockerfile      | 2 ++
 granule_ingester/requirements.txt       | 5 +++--
 4 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index 4e72ff5..83e94ad 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -14,6 +14,7 @@ COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
 RUN cd /common && python setup.py install 
 RUN cd /collection_manager && python setup.py install
+
 RUN pip install boto3==1.16.10
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index da92b1e..810e278 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -7,5 +7,4 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
index 57bacff..1e7aedd 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -18,6 +18,8 @@ RUN ./install_nexusproto.sh
 RUN cd /common && python setup.py install
 RUN cd /sdap && python setup.py install
 
+RUN pip install boto3==1.16.10
+
 RUN apk del .build-deps
 
 ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 9b06860..d82e6ce 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,6 +1,7 @@
 cassandra-driver==3.23.0
 aiomultiprocess==0.7.0
-aioboto3
+aioboto3==8.0.5
 tblib==1.6.0
 pysolr==3.9.0
-kazoo==2.8.0
\ No newline at end of file
+kazoo==2.8.0
+aio-pika==6.7.1
\ No newline at end of file


[incubator-sdap-ingester] 13/15: Add S3_BUCKET to docker entrypoint

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 74afad3fc959aaebded51cd2e1d2b08129c4a57b
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 30 14:55:02 2020 -0700

    Add S3_BUCKET to docker entrypoint
---
 collection_manager/docker/entrypoint.sh | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh
index 988dd2c..cad304a 100644
--- a/collection_manager/docker/entrypoint.sh
+++ b/collection_manager/docker/entrypoint.sh
@@ -7,5 +7,6 @@ python /collection_manager/collection_manager/main.py \
   $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \
-  $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH)
-  $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH)
+  $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) \
+  $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH) \
+  $([[ ! -z "$S3_BUCKET" ]] && echo --s3-bucket=$S3_BUCKET)


[incubator-sdap-ingester] 03/15: Create S3Observer

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 16a636e7df5a0968a5c643c611835bb29cf38830
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 6 13:15:23 2020 -0700

    Create S3Observer
---
 .../services/CollectionWatcher.py                  |  14 +--
 .../collection_manager/services/S3Observer.py      | 140 +++++++++++++++++++++
 .../collection_manager/services/__init__.py        |   1 +
 collection_manager/requirements.txt                |   3 +-
 4 files changed, 150 insertions(+), 8 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 20ec7c7..499a17d 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -12,7 +12,7 @@ from collection_manager.entities.exceptions import (
     CollectionConfigFileNotFoundError, CollectionConfigParsingError,
     ConflictingPathCollectionError, MissingValueCollectionError,
     RelativePathCollectionError, RelativePathError)
-from watchdog.events import FileSystemEventHandler
+from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
 logger = logging.getLogger(__name__)
@@ -179,9 +179,9 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_modified(self, event):
         super().on_modified(event)
-        if os.path.isdir(event.src_path):
-            return
-
-        for collection in self._collections_for_dir:
-            if collection.owns_file(event.src_path):
-                self._loop.create_task(self._callback(event.src_path, collection))
+        # if os.path.isdir(event.src_path):
+        #     return
+        if type(event) == FileModifiedEvent:
+            for collection in self._collections_for_dir:
+                if collection.owns_file(event.src_path):
+                    self._loop.create_task(self._callback(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
new file mode 100644
index 0000000..9a86d1e
--- /dev/null
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -0,0 +1,140 @@
+import asyncio
+import datetime
+import os
+import time
+from dataclasses import dataclass
+from typing import Set, Dict, Optional, Callable, Awaitable
+
+import aioboto3
+
+os.environ['AWS_PROFILE'] = "saml-pub"
+os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
+
+
+@dataclass
+class S3Event:
+    src_path: str
+
+
+class S3FileModifiedEvent(S3Event):
+    pass
+
+
+class S3FileCreatedEvent(S3Event):
+    pass
+
+
+class S3Watch(object):
+    def __init__(self, path: str, event_handler) -> None:
+        self.path = path
+        self.event_handler = event_handler
+
+
+class S3Observer:
+
+    def __init__(self, bucket, initial_scan=False) -> None:
+        self._bucket = bucket
+        self._cache: Dict[str, datetime.datetime] = {}
+        self._initial_scan = initial_scan
+        self._watches: Set[S3Watch] = set()
+
+        self._has_polled = False
+
+    async def start(self):
+        await self._run_periodically(loop=None,
+                                     wait_time=30,
+                                     func=self._poll)
+
+    def unschedule(self, watch: S3Watch):
+        self._watches.remove(watch)
+
+    def schedule(self, path: str, event_handler):
+        watch = S3Watch(path=path, event_handler=event_handler)
+        self._watches.add(watch)
+        return watch
+
+    @classmethod
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                func: Callable[[any], Awaitable],
+                                *args,
+                                **kwargs):
+        """
+        Call a function periodically. This uses asyncio, and is non-blocking.
+        :param loop: An optional event loop to use. If None, the current running event loop will be used.
+        :param wait_time: seconds to wait between iterations of func
+        :param func: the async function that will be awaited
+        :param args: any args that need to be provided to func
+        """
+        if loop is None:
+            loop = asyncio.get_running_loop()
+        await func(*args, **kwargs)
+        loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time, func, *args, **kwargs))
+
+    async def _poll(self):
+        new_cache = {}
+        watch_index = {}
+
+        for watch in self._watches:
+            new_cache_for_watch = await self._get_s3_files(watch.path)
+            new_index = {file: watch for file in new_cache_for_watch}
+
+            new_cache = {**new_cache, **new_cache_for_watch}
+            watch_index = {**watch_index, **new_index}
+        difference = set(new_cache.items()) - set(self._cache.items())
+
+        if self._has_polled or self._initial_scan:
+            for (file, modified_date) in difference:
+                watch = watch_index[file]
+                file_is_new = file not in self._cache
+
+                if file_is_new:
+                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+                else:
+                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+
+        self._cache = new_cache
+        self._has_polled = True
+
+    async def _get_s3_files(self, path: str):
+        new_cache = {}
+
+        start = time.perf_counter()
+        async with aioboto3.resource("s3") as s3:
+            bucket = await s3.Bucket(self._bucket)
+            async for file in bucket.objects.filter(Prefix=path):
+                new_cache[file.key] = await file.last_modified
+        end = time.perf_counter()
+        duration = end - start
+
+        print(f"Retrieved {len(new_cache)} objects in {duration}")
+
+        return new_cache
+
+
+async def test():
+    observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
+    handler = Handler()
+    observer.schedule('avhrr/2012', handler)
+    observer.schedule('avhrr/2013', handler)
+
+    await observer.start()
+
+    while True:
+        try:
+            await asyncio.sleep(1)
+        except KeyboardInterrupt:
+            return
+
+
+class Handler:
+    def on_created(self, event: S3Event):
+        print(f"File created: {event.src_path}")
+
+    def on_modified(self, event: S3Event):
+        print(f"File modified: {event.src_path}")
+
+
+if __name__ == "__main__":
+    asyncio.run(test())
diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py
index 635d3dc..553e1b7 100644
--- a/collection_manager/collection_manager/services/__init__.py
+++ b/collection_manager/collection_manager/services/__init__.py
@@ -16,3 +16,4 @@
 from .CollectionProcessor import CollectionProcessor
 from .CollectionWatcher import CollectionWatcher
 from .MessagePublisher import MessagePublisher
+from .S3Observer import S3Observer
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index ee12c89..34f1334 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,4 +4,5 @@ pysolr==3.9.0
 watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
-tenacity==6.2.0
\ No newline at end of file
+tenacity==6.2.0
+aioboto3==8.0.5
\ No newline at end of file


[incubator-sdap-ingester] 02/15: Move directory scanning out of Collection class

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 171225e6dec6827d806b3208ffa0d9fb0941b7a0
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 2 13:33:46 2020 -0700

    Move directory scanning out of Collection class
---
 .../collection_manager/entities/Collection.py          |  3 ---
 collection_manager/collection_manager/main.py          |  1 -
 .../collection_manager/services/CollectionProcessor.py | 18 ++++++------------
 .../collection_manager/services/CollectionWatcher.py   |  8 +++++---
 .../services/history_manager/IngestionHistory.py       | 17 +++++++++--------
 5 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 031a3a9..0feba0e 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -53,6 +53,3 @@ class Collection:
             return os.path.dirname(file_path) == self.path
         else:
             return fnmatch(file_path, self.path)
-
-    def files_owned(self) -> List[str]:
-        return glob(self.path, recursive=True)
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index cbe22f9..3de4fdd 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -70,7 +70,6 @@ async def main():
             collection_processor = CollectionProcessor(message_publisher=publisher,
                                                        history_manager_builder=history_manager_builder)
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
-                                                   collection_updated_callback=collection_processor.process_collection,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh))
 
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 975f50c..ab8ce95 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,12 +1,15 @@
 import logging
 import os.path
+from glob import glob
 from typing import Dict
-import yaml
 
+import yaml
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
-from collection_manager.services.history_manager import IngestionHistory, GranuleStatus
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
+from collection_manager.services.history_manager import (GranuleStatus,
+                                                         IngestionHistory)
+from collection_manager.services.history_manager.IngestionHistory import \
+    IngestionHistoryBuilder
 
 logger = logging.getLogger(__name__)
 
@@ -20,15 +23,6 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_collection(self, collection: Collection):
-        """
-        Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
-        :param collection: A Collection definition
-        :return: None
-        """
-        for granule in collection.files_owned():
-            await self.process_granule(granule, collection)
-
     async def process_granule(self, granule: str, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1c7c1be..20ec7c7 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -3,6 +3,7 @@ import logging
 import os
 import time
 from collections import defaultdict
+from glob import glob
 from typing import Awaitable, Callable, Dict, Optional, Set
 
 import yaml
@@ -21,14 +22,12 @@ logger.setLevel(logging.DEBUG)
 class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
-                 collection_updated_callback: Callable[[Collection], Awaitable],
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
 
         self._collections_path = collections_path
-        self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
         self._collections_refresh_interval = collections_refresh_interval
 
@@ -107,7 +106,10 @@ class CollectionWatcher:
                 logger.info(f"Scanning files for {len(updated_collections)} collections...")
                 start = time.perf_counter()
                 for collection in updated_collections:
-                    await self._collection_updated_callback(collection)
+                    files_owned = glob(collection.path, recursive=True)
+                    for granule in files_owned:
+                        await self._granule_updated_callback(granule, collection)
+
                 logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
 
                 self._unschedule_watches()
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index b71c32f..231d179 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -48,10 +48,11 @@ class IngestionHistory(ABC):
         signature = self._signature_fun(file_path)
         await self._push_record(file_name, signature)
 
+        file_modified_date = os.path.getmtime(file_path)
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = os.path.getmtime(file_path)
+            self._latest_ingested_file_update = file_modified_date
         else:
-            self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path))
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
 
         await self._save_latest_timestamp()
 
@@ -73,9 +74,10 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()):
+        file_modified_date = os.path.getmtime(file_path)
+        if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path):
+        elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -114,15 +116,14 @@ class IngestionHistory(ABC):
         pass
 
     @staticmethod
-    def _in_time_range(file, date_from: datetime = None, date_to: datetime = None):
+    def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime = None):
         """
         :param file: file path as a string
         :param date_from: timestamp, can be None
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and ts_to. False otherwise
         """
-        file_modified_time = os.path.getmtime(file)
-        is_after_from = date_from.timestamp() < file_modified_time if date_from else True
-        is_before_to = date_to.timestamp() > file_modified_time if date_to else True
+        is_after_from = start_date.timestamp() < date if start_date else True
+        is_before_to = end_date.timestamp() > date if end_date else True
 
         return is_after_from and is_before_to


[incubator-sdap-ingester] 12/15: Pass s3-bucket at startup

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 0976c6d51f3e50e636585f6977a20c0b5f58bd58
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 30 14:53:05 2020 -0700

    Pass s3-bucket at startup
---
 collection_manager/collection_manager/main.py                 | 11 +++++------
 .../collection_manager/services/CollectionWatcher.py          |  8 ++------
 2 files changed, 7 insertions(+), 12 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 782c70e..b80ae7c 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -54,6 +54,9 @@ def get_args() -> argparse.Namespace:
                         default='30',
                         metavar="INTERVAL",
                         help='Number of seconds after which to reload the collections config file. (Default: 30)')
+    parser.add_argument('--s3-bucket',
+                        metavar='S3-BUCKET',
+                        help='Optional name of an AWS S3 bucket where granules are stored. If this option is set, then all collections to be scanned must have their granules on S3, not the local filesystem.')
 
     return parser.parse_args()
 
@@ -61,12 +64,8 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
-        ENABLE_S3 = True
 
-        if ENABLE_S3:
-            signature_fun = None
-        else:
-            signature_fun = md5sum_from_filepath
+        signature_fun = None if options.s3_bucket else md5sum_from_filepath
 
         if options.history_path:
             history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path,
@@ -83,7 +82,7 @@ async def main():
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh),
-                                                   s3=ENABLE_S3)
+                                                   s3_bucket=options.s3_bucket)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index abd4a11..b1aaf4e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -27,7 +27,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
-                 s3: bool = False,
+                 s3_bucket: Optional[str] = None,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -37,11 +37,7 @@ class CollectionWatcher:
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-
-        if s3:
-            self._observer = S3Observer('nexus-ingest', initial_scan=True)
-        else:
-            self._observer = Observer()
+        self._observer = S3Observer(s3_bucket, initial_scan=True) if s3_bucket else Observer()
 
         self._granule_watches = set()
 


[incubator-sdap-ingester] 07/15: Properly scans S3, still needs S3 signature fun

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 883dddf75350a08953d4fb16715b4e404ece6d8a
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 22 16:38:08 2020 -0700

    Properly scans S3, still needs S3 signature fun
---
 .../services/CollectionProcessor.py                |  7 ++---
 .../services/CollectionWatcher.py                  | 14 +++++++---
 .../collection_manager/services/S3Observer.py      | 18 ++++++++-----
 .../services/history_manager/IngestionHistory.py   | 31 ++++++++++++++--------
 .../tests/services/test_S3Observer.py              |  8 ++++++
 5 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index ab8ce95..89d413b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -2,6 +2,7 @@ import logging
 import os.path
 from glob import glob
 from typing import Dict
+from datetime import datetime
 
 import yaml
 from collection_manager.entities import Collection
@@ -23,7 +24,7 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_granule(self, granule: str, collection: Collection):
+    async def process_granule(self, granule: str, modified_time: datetime, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it.
         :param granule: A path to a granule file
@@ -34,7 +35,7 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = await history_manager.get_granule_status(granule, collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule, modified_time, collection.date_from, collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion "
@@ -54,7 +55,7 @@ class CollectionProcessor:
 
         dataset_config = self._generate_ingestion_message(granule, collection)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
-        await history_manager.push(granule)
+        await history_manager.push(granule, modified_time)
 
     @staticmethod
     def _file_supported(file_path: str):
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 87b1ac3..f885b1c 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,6 +1,7 @@
 import asyncio
+from datetime import datetime
 from collection_manager.entities.Collection import CollectionStorageType
-from collection_manager.services.S3Observer import S3Observer
+from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
@@ -39,7 +40,7 @@ class CollectionWatcher:
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
 
         if s3:
-            self._observer = S3Observer('nexus-ingest')
+            self._observer = S3Observer('nexus-ingest', initial_scan=True)
         else:
             self._observer = Observer()
 
@@ -197,9 +198,14 @@ class _GranuleEventHandler(FileSystemEventHandler):
         self._handle_event(event)
 
     def _handle_event(self, event):
+        path = event.src_path
         for collection in self._collections_for_dir:
             try:
-                if collection.owns_file(event.src_path):
-                    self._loop.create_task(self._callback(event.src_path, collection))
+                if collection.owns_file(path):
+                    if isinstance(event, S3Event):
+                        modified_time = event.modified_time
+                    else:
+                        modified_time = os.path.getmtime(path)
+                    self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 376a907..d204890 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -1,4 +1,5 @@
 import asyncio
+from urllib.parse import urlparse
 import datetime
 import os
 import time
@@ -14,6 +15,7 @@ os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
 @dataclass
 class S3Event:
     src_path: str
+    modified_time: datetime.datetime
 
 
 class S3FileModifiedEvent(S3Event):
@@ -48,7 +50,7 @@ class S3Observer:
     def unschedule(self, watch: S3Watch):
         self._watches.remove(watch)
 
-    def schedule(self,event_handler, path: str):
+    def schedule(self, event_handler, path: str):
         watch = S3Watch(path=path, event_handler=event_handler)
         self._watches.add(watch)
         return watch
@@ -90,9 +92,9 @@ class S3Observer:
                 file_is_new = file not in self._cache
 
                 if file_is_new:
-                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file, modified_time=modified_date))
                 else:
-                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file, modified_time=modified_date))
 
         self._cache = new_cache
         self._has_polled = True
@@ -104,9 +106,9 @@ class S3Observer:
         async with aioboto3.resource("s3") as s3:
             bucket = await s3.Bucket(self._bucket)
 
-            # we need the key without the bucket name
-            async for file in bucket.objects.filter(Prefix=path):
-                new_cache[file.key] = await file.last_modified
+            object_key = S3Observer._get_object_key(path)
+            async for file in bucket.objects.filter(Prefix=object_key):
+                new_cache[f"s3://{file.bucket_name}/{file.key}"] = await file.last_modified
         end = time.perf_counter()
         duration = end - start
 
@@ -114,6 +116,10 @@ class S3Observer:
 
         return new_cache
 
+    def _get_object_key(full_path: str):
+        key = urlparse(full_path).path.strip("/")
+        return key
+
 
 async def test():
     observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index 231d179..ea50ffb 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -1,4 +1,5 @@
 import hashlib
+from urllib.parse import urlparse
 import logging
 import os
 from abc import ABC, abstractmethod
@@ -6,6 +7,8 @@ from datetime import datetime
 from enum import Enum
 from typing import Optional
 
+from botocore.compat import filter_ssl_warnings
+
 logger = logging.getLogger(__name__)
 
 BLOCK_SIZE = 65536
@@ -37,27 +40,26 @@ class IngestionHistory(ABC):
     _signature_fun = None
     _latest_ingested_file_update = None
 
-    async def push(self, file_path: str):
+    async def push(self, file_path: str, modified_time: datetime):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
+        file_name = IngestionHistory._get_standardized_path(file_path)
         signature = self._signature_fun(file_path)
         await self._push_record(file_name, signature)
 
-        file_modified_date = os.path.getmtime(file_path)
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = file_modified_date
+            self._latest_ingested_file_update = modified_time 
         else:
-            self._latest_ingested_file_update = max(self._latest_ingested_file_update, file_modified_date)
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time)
 
         await self._save_latest_timestamp()
 
     async def get_granule_status(self,
                                  file_path: str,
+                                 modified_time: datetime,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -74,14 +76,22 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        file_modified_date = os.path.getmtime(file_path)
-        if self._in_time_range(file_modified_date, start_date=self._latest_ingested_mtime()):
+        if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_modified_date, date_from, date_to) and not await self._already_ingested(file_path):
+        elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
+    def _get_standardized_path(file_path: str):
+        file_path = file_path.strip()
+        # TODO: Why do we need to record the basename of the path, instead of just the full path?
+        # The only reason this is here right now is for backwards compatibility to avoid triggering a full reingestion.
+        if urlparse(file_path).scheme == 's3':
+            return urlparse(file_path).path.strip("/")
+        else:
+            return os.path.basename(file_path)
+
     def _latest_ingested_mtime(self) -> Optional[datetime]:
         """
         Return the modified time of the most recently modified file that was ingested.
@@ -98,8 +108,7 @@ class IngestionHistory(ABC):
         :param file_path: The full path of a file to search for in the history.
         :return: A boolean indicating whether this file has already been ingested or not
         """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
+        file_name = IngestionHistory._get_standardized_path(file_path)
         signature = self._signature_fun(file_path)
         return signature == await self._get_signature(file_name)
 
diff --git a/collection_manager/tests/services/test_S3Observer.py b/collection_manager/tests/services/test_S3Observer.py
new file mode 100644
index 0000000..3fa49e0
--- /dev/null
+++ b/collection_manager/tests/services/test_S3Observer.py
@@ -0,0 +1,8 @@
+from collection_manager.services import S3Observer
+import unittest
+
+
+class TestS3Observer(unittest.TestCase):
+
+    def test_get_object_key(self):
+        self.assertEqual('test_dir/object.nc', S3Observer._get_object_key('s3://test-bucket/test_dir/object.nc'))


[incubator-sdap-ingester] 11/15: no need to hardcode aws creds

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit dc519abcd7861234226d865db5c6ea02c045bbc7
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 17:37:02 2020 -0700

    no need to hardcode aws creds
---
 collection_manager/collection_manager/services/S3Observer.py | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index d204890..87458a9 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -8,9 +8,6 @@ from typing import Set, Dict, Optional, Callable, Awaitable
 
 import aioboto3
 
-os.environ['AWS_PROFILE'] = "saml-pub"
-os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
-
 
 @dataclass
 class S3Event:


[incubator-sdap-ingester] 08/15: fix signature_fun for s3

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 1a15971e946950e86769fd3763ced15a2e5bb00d
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 29 15:33:56 2020 -0700

    fix signature_fun for s3
---
 collection_manager/collection_manager/main.py      | 21 +++++++++++----
 .../history_manager/FileIngestionHistory.py        |  3 +--
 .../services/history_manager/IngestionHistory.py   | 30 +++++++++++++---------
 .../history_manager/SolrIngestionHistory.py        |  7 ++---
 4 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 044cb87..3dba6e0 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -3,8 +3,11 @@ import asyncio
 import logging
 import os
 
-from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher
-from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder
+from collection_manager.services import (CollectionProcessor,
+                                         CollectionWatcher, MessagePublisher)
+from collection_manager.services.history_manager import (
+    FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
+    md5sum_from_filepath)
 
 logging.basicConfig(level=logging.INFO)
 logging.getLogger("pika").setLevel(logging.WARNING)
@@ -58,11 +61,19 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
+        ENABLE_S3 = False
+
+        if ENABLE_S3:
+            signature_fun = None
+        else:
+            signature_fun = md5sum_from_filepath
 
         if options.history_path:
-            history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path)
+            history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path,
+                                                                  signature_fun=signature_fun)
         else:
-            history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url)
+            history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url,
+                                                                  signature_fun=signature_fun)
         async with MessagePublisher(host=options.rabbitmq_host,
                                     username=options.rabbitmq_username,
                                     password=options.rabbitmq_password,
@@ -72,7 +83,7 @@ async def main():
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh),
-                                                   s3=True)
+                                                   s3=ENABLE_S3)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
index ffa065f..cf92997 100644
--- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py
@@ -4,7 +4,6 @@ from pathlib import Path
 
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
 from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
 logger = logging.getLogger(__name__)
 
@@ -33,7 +32,7 @@ class FileIngestionHistory(IngestionHistory):
         """
         self._dataset_id = dataset_id
         self._history_file_path = os.path.join(history_path, f'{dataset_id}.csv')
-        self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
+        self._signature_fun = signature_fun
         self._history_dict = {}
         self._load_history_dict()
 
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index ea50ffb..d901690 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -38,28 +38,29 @@ class GranuleStatus(Enum):
 
 class IngestionHistory(ABC):
     _signature_fun = None
-    _latest_ingested_file_update = None
+    _latest_ingested_file_update: float = None
 
-    async def push(self, file_path: str, modified_time: datetime):
+    async def push(self, file_path: str, modified_datetime: datetime):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
+        modified_timestamp = int(modified_datetime.timestamp())
         file_name = IngestionHistory._get_standardized_path(file_path)
-        signature = self._signature_fun(file_path)
+        signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_timestamp)
         await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
-            self._latest_ingested_file_update = modified_time 
+            self._latest_ingested_file_update = modified_timestamp
         else:
-            self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_time)
+            self._latest_ingested_file_update = max(self._latest_ingested_file_update, modified_timestamp)
 
         await self._save_latest_timestamp()
 
     async def get_granule_status(self,
                                  file_path: str,
-                                 modified_time: datetime,
+                                 modified_datetime: datetime,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -76,9 +77,11 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(modified_time, start_date=self._latest_ingested_mtime()):
+        signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_datetime.timestamp())
+
+        if self._in_time_range(modified_datetime, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(modified_time, date_from, date_to) and not await self._already_ingested(file_path):
+        elif self._in_time_range(modified_datetime, date_from, date_to) and not await self._already_ingested(file_path, signature):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -102,14 +105,13 @@ class IngestionHistory(ABC):
         else:
             return None
 
-    async def _already_ingested(self, file_path: str) -> bool:
+    async def _already_ingested(self, file_path: str, signature) -> bool:
         """
         Return a boolean indicating whether the specified file has already been ingested, based on its signature.
         :param file_path: The full path of a file to search for in the history.
         :return: A boolean indicating whether this file has already been ingested or not
         """
         file_name = IngestionHistory._get_standardized_path(file_path)
-        signature = self._signature_fun(file_path)
         return signature == await self._get_signature(file_name)
 
     @abstractmethod
@@ -132,7 +134,11 @@ class IngestionHistory(ABC):
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and ts_to. False otherwise
         """
-        is_after_from = start_date.timestamp() < date if start_date else True
-        is_before_to = end_date.timestamp() > date if end_date else True
+        is_after_from = start_date.timestamp() < date.timestamp() if start_date else True
+        is_before_to = end_date.timestamp() > date.timestamp() if end_date else True
 
         return is_after_from and is_before_to
+
+    @staticmethod
+    def _signature_from_timestamp(timestamp: float):
+        return str(int(timestamp))
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index 59f5cd7..ebed073 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -3,11 +3,8 @@ import logging
 
 import pysolr
 import requests
-
+from collection_manager.services.history_manager.IngestionHistory import (IngestionHistory, IngestionHistoryBuilder)
 from common.async_utils.AsyncUtils import run_in_executor
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistory
-from collection_manager.services.history_manager.IngestionHistory import IngestionHistoryBuilder
-from collection_manager.services.history_manager.IngestionHistory import md5sum_from_filepath
 
 logging.getLogger("pysolr").setLevel(logging.WARNING)
 logger = logging.getLogger(__name__)
@@ -40,7 +37,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}")
             self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}")
             self._dataset_id = dataset_id
-            self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun
+            self._signature_fun = signature_fun
             self._latest_ingested_file_update = self._get_latest_file_update()
         except requests.exceptions.RequestException:
             raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}")


[incubator-sdap-ingester] 14/15: requirements

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit b342b3dc8d34176b14a66a87b9aa0759c147d513
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Mon Nov 2 16:28:20 2020 -0800

    requirements
---
 collection_manager/docker/Dockerfile | 3 ++-
 collection_manager/requirements.txt  | 4 ++--
 collection_manager/setup.py          | 2 +-
 3 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/collection_manager/docker/Dockerfile b/collection_manager/docker/Dockerfile
index 2a57784..4e72ff5 100644
--- a/collection_manager/docker/Dockerfile
+++ b/collection_manager/docker/Dockerfile
@@ -12,7 +12,8 @@ COPY collection_manager/requirements.txt /collection_manager/requirements.txt
 COPY collection_manager/README.md /collection_manager/README.md
 COPY collection_manager/docker/entrypoint.sh /entrypoint.sh
 
-RUN cd /common && python setup.py install
+RUN cd /common && python setup.py install 
 RUN cd /collection_manager && python setup.py install
+RUN pip install boto3==1.16.10
 
 ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 3402a73..c4b6323 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -3,7 +3,7 @@ pystache==0.5.4
 pysolr==3.9.0
 watchdog==0.10.2
 requests==2.23.0
-aio-pika==6.6.1
 tenacity==6.2.0
 aioboto3==8.0.5
-aiohttp==3.6.2
\ No newline at end of file
+aiohttp==3.7.2
+aio-pika==6.7.1
\ No newline at end of file
diff --git a/collection_manager/setup.py b/collection_manager/setup.py
index 0616d0f..e1178f8 100644
--- a/collection_manager/setup.py
+++ b/collection_manager/setup.py
@@ -29,7 +29,7 @@ setuptools.setup(
         "Operating System :: OS Independent",
         "Development Status :: 4 - Beta",
     ],
-    python_requires='>=3.6',
+    python_requires='>=3.8',
     include_package_data=True,
     install_requires=pip_requirements
 )


[incubator-sdap-ingester] 01/15: Make some public methods private, for clarity

Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit de64c8bb70764008b163927970a319e83a5cc21b
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Thu Oct 1 18:20:22 2020 -0700

    Make some public methods private, for clarity
---
 .../services/history_manager/IngestionHistory.py   | 46 +++++++++++-----------
 .../history_manager/test_FileIngestionHistory.py   |  6 +--
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index ef73ccb..b71c32f 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -55,27 +55,6 @@ class IngestionHistory(ABC):
 
         await self._save_latest_timestamp()
 
-    def latest_ingested_mtime(self) -> Optional[datetime]:
-        """
-        Return the modified time of the most recently modified file that was ingested.
-        :return: A datetime or None
-        """
-        if self._latest_ingested_file_update:
-            return datetime.fromtimestamp(self._latest_ingested_file_update)
-        else:
-            return None
-
-    async def already_ingested(self, file_path: str) -> bool:
-        """
-        Return a boolean indicating whether the specified file has already been ingested, based on its signature.
-        :param file_path: The full path of a file to search for in the history.
-        :return: A boolean indicating whether this file has already been ingested or not
-        """
-        file_path = file_path.strip()
-        file_name = os.path.basename(file_path)
-        signature = self._signature_fun(file_path)
-        return signature == await self._get_signature(file_name)
-
     async def get_granule_status(self,
                                  file_path: str,
                                  date_from: datetime = None,
@@ -94,13 +73,34 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        if self._in_time_range(file_path, date_from=self.latest_ingested_mtime()):
+        if self._in_time_range(file_path, date_from=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(file_path, date_from, date_to) and not await self.already_ingested(file_path):
+        elif self._in_time_range(file_path, date_from, date_to) and not await self._already_ingested(file_path):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
 
+    def _latest_ingested_mtime(self) -> Optional[datetime]:
+        """
+        Return the modified time of the most recently modified file that was ingested.
+        :return: A datetime or None
+        """
+        if self._latest_ingested_file_update:
+            return datetime.fromtimestamp(self._latest_ingested_file_update)
+        else:
+            return None
+
+    async def _already_ingested(self, file_path: str) -> bool:
+        """
+        Return a boolean indicating whether the specified file has already been ingested, based on its signature.
+        :param file_path: The full path of a file to search for in the history.
+        :return: A boolean indicating whether this file has already been ingested or not
+        """
+        file_path = file_path.strip()
+        file_name = os.path.basename(file_path)
+        signature = self._signature_fun(file_path)
+        return signature == await self._get_signature(file_name)
+
     @abstractmethod
     async def _save_latest_timestamp(self):
         pass
diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
index 07ab0e1..8bd939e 100644
--- a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
+++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -36,7 +36,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
             await ingestion_history.push(str(current_file_path))
-            self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
+            self.assertTrue(await ingestion_history._already_ingested(str(current_file_path)))
 
             del ingestion_history
 
@@ -47,7 +47,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
             await ingestion_history.push(str(current_file_path))
-            self.assertTrue(await ingestion_history.already_ingested(str(current_file_path)))
+            self.assertTrue(await ingestion_history._already_ingested(str(current_file_path)))
 
             del ingestion_history
 
@@ -57,7 +57,7 @@ class TestFileIngestionHistory(unittest.TestCase):
             ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath)
             # history_manager with this file
             current_file_path = pathlib.Path(__file__)
-            self.assertFalse(await ingestion_history.already_ingested(str(current_file_path)))
+            self.assertFalse(await ingestion_history._already_ingested(str(current_file_path)))
 
 
 if __name__ == '__main__':