You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/11/04 01:11:41 UTC

[incubator-sdap-ingester] 12/15: Pass s3-bucket at startup

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 0976c6d51f3e50e636585f6977a20c0b5f58bd58
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Fri Oct 30 14:53:05 2020 -0700

    Pass s3-bucket at startup
---
 collection_manager/collection_manager/main.py                 | 11 +++++------
 .../collection_manager/services/CollectionWatcher.py          |  8 ++------
 2 files changed, 7 insertions(+), 12 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 782c70e..b80ae7c 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -54,6 +54,9 @@ def get_args() -> argparse.Namespace:
                         default='30',
                         metavar="INTERVAL",
                         help='Number of seconds after which to reload the collections config file. (Default: 30)')
+    parser.add_argument('--s3-bucket',
+                        metavar='S3-BUCKET',
+                        help='Optional name of an AWS S3 bucket where granules are stored. If this option is set, then all collections to be scanned must have their granules on S3, not the local filesystem.')
 
     return parser.parse_args()
 
@@ -61,12 +64,8 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
-        ENABLE_S3 = True
 
-        if ENABLE_S3:
-            signature_fun = None
-        else:
-            signature_fun = md5sum_from_filepath
+        signature_fun = None if options.s3_bucket else md5sum_from_filepath
 
         if options.history_path:
             history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path,
@@ -83,7 +82,7 @@ async def main():
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
                                                    collections_refresh_interval=int(options.refresh),
-                                                   s3=ENABLE_S3)
+                                                   s3_bucket=options.s3_bucket)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index abd4a11..b1aaf4e 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -27,7 +27,7 @@ class CollectionWatcher:
     def __init__(self,
                  collections_path: str,
                  granule_updated_callback: Callable[[str, Collection], Awaitable],
-                 s3: bool = False,
+                 s3_bucket: Optional[str] = None,
                  collections_refresh_interval: float = 30):
         if not os.path.isabs(collections_path):
             raise RelativePathError("Collections config  path must be an absolute path.")
@@ -37,11 +37,7 @@ class CollectionWatcher:
         self._collections_refresh_interval = collections_refresh_interval
 
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
-
-        if s3:
-            self._observer = S3Observer('nexus-ingest', initial_scan=True)
-        else:
-            self._observer = Observer()
+        self._observer = S3Observer(s3_bucket, initial_scan=True) if s3_bucket else Observer()
 
         self._granule_watches = set()