You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/10/30 21:53:22 UTC

[incubator-sdap-ingester] branch s3-support updated: Pass s3-bucket at startup

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/s3-support by this push:
     new d97f5cb  Pass s3-bucket at startup
d97f5cb is described below

commit d97f5cba8dba3bab2801267ec9572d81aeaf4281
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 30 14:53:05 2020 -0700

    Pass s3-bucket at startup
---
 collection_manager/collection_manager/main.py                 | 11 +++++------
 .../collection_manager/services/CollectionWatcher.py          |  8 ++------
 2 files changed, 7 insertions(+), 12 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 782c70e..b80ae7c 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -54,6 +54,9 @@ def get_args() -> argparse.Namespace:
                         default='30',
                         metavar="INTERVAL",
                         help='Number of seconds after which to reload the collections config file. (Default: 30)')
+    parser.add_argument('--s3-bucket',
+                        metavar='S3-BUCKET',
+                        help='Optional name of an AWS S3 bucket where granules are stored. If this option is set, then all collections to be scanned must have their granules on S3, not the local filesystem.')
 
     return parser.parse_args()
 
@@ -61,12 +64,8 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
-        ENABLE_S3 = True
 
-        if ENABLE_S3:
-            signature_fun = None
-        else:
-            signature_fun = md5sum_from_filepath
+        signature_fun = None if options.s3_bucket else md5sum_from_filepath
 
         if options.history_path:
             history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path,
@@ -83,7 +82,7 @@ async def main():
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh),
-                                                   s3=ENABLE_S3)
+                                                   s3_bucket=options.s3_bucket)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index abd4a11..b1aaf4e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -27,7 +27,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
-                 s3: bool = False,
+                 s3_bucket: Optional[str] = None,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -37,11 +37,7 @@ class CollectionWatcher:
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-
-        if s3:
-            self._observer = S3Observer('nexus-ingest', initial_scan=True)
-        else:
-            self._observer = Observer()
+        self._observer = S3Observer(s3_bucket, initial_scan=True) if s3_bucket else Observer()
 
         self._granule_watches = set()