You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/06/22 18:59:29 UTC
[incubator-sdap-ingester] 01/02: fixed bugs
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch bug_fixes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 33d302ccff792a28f0462924811246f2565658a8
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jun 22 11:59:02 2020 -0700
fixed bugs
---
.../entities/exceptions/Exceptions.py | 2 +-
.../collection_manager/entities/exceptions/__init__.py | 2 +-
collection_manager/collection_manager/main.py | 18 ++++++++----------
.../collection_manager/services/CollectionProcessor.py | 6 +++---
.../collection_manager/services/CollectionWatcher.py | 18 +++++++++++++-----
collection_manager/tests/resources/collections_bad.yml | 17 -----------------
.../tests/services/test_CollectionProcessor.py | 6 +++---
.../tests/services/test_CollectionWatcher.py | 12 +++++++++---
8 files changed, 38 insertions(+), 43 deletions(-)
diff --git a/collection_manager/collection_manager/entities/exceptions/Exceptions.py b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
index 8e63d24..c18c4c8 100644
--- a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -2,7 +2,7 @@ class RelativePathError(Exception):
pass
-class YamlParsingError(Exception):
+class CollectionConfigParsingError(Exception):
pass
diff --git a/collection_manager/collection_manager/entities/exceptions/__init__.py b/collection_manager/collection_manager/entities/exceptions/__init__.py
index 9a22c16..7fac507 100644
--- a/collection_manager/collection_manager/entities/exceptions/__init__.py
+++ b/collection_manager/collection_manager/entities/exceptions/__init__.py
@@ -3,4 +3,4 @@ from .Exceptions import ConflictingPathCollectionError
from .Exceptions import MissingValueCollectionError
from .Exceptions import RelativePathCollectionError
from .Exceptions import RelativePathError
-from .Exceptions import YamlParsingError
+from .Exceptions import CollectionConfigParsingError
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index a10446f..296cb12 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -8,7 +8,7 @@ from collection_manager.services.history_manager import SolrIngestionHistoryBuil
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.WARNING)
-logger = logging.getLogger("collection_manager")
+logger = logging.getLogger()
def check_path(path) -> str:
@@ -19,25 +19,23 @@ def check_path(path) -> str:
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run ingestion for a list of collection ingestion streams")
- parser.add_argument("--refresh",
- help="refresh interval in seconds to check for new or updated granules",
- default=300)
- parser.add_argument("--collections",
+ parser.add_argument("--collections-path",
help="Absolute path to collections configuration file",
+ metavar="PATH",
required=True)
- parser.add_argument('--rabbitmq_host',
+ parser.add_argument('--rabbitmq-host',
default='localhost',
metavar='HOST',
help='RabbitMQ hostname to connect to. (Default: "localhost")')
- parser.add_argument('--rabbitmq_username',
+ parser.add_argument('--rabbitmq-username',
default='guest',
metavar='USERNAME',
help='RabbitMQ username. (Default: "guest")')
- parser.add_argument('--rabbitmq_password',
+ parser.add_argument('--rabbitmq-password',
default='guest',
metavar='PASSWORD',
help='RabbitMQ password. (Default: "guest")')
- parser.add_argument('--rabbitmq_queue',
+ parser.add_argument('--rabbitmq-queue',
default="nexus",
metavar="QUEUE",
help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
@@ -65,7 +63,7 @@ def main():
publisher.connect()
collection_processor = CollectionProcessor(message_publisher=publisher,
history_manager_builder=history_manager_builder)
- collection_watcher = CollectionWatcher(collections_path=options.collections,
+ collection_watcher = CollectionWatcher(collections_path=options.collections_path,
collection_updated_callback=collection_processor.process_collection,
granule_updated_callback=collection_processor.process_granule)
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index a81390b..232cdee 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -63,7 +63,7 @@ class CollectionProcessor:
f"time range for collection '{collection.dataset_id}'. Skipping.")
return
- dataset_config = self._fill_template(collection, config_template=self._config_template)
+ dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
self._publisher.publish_message(body=dataset_config, priority=use_priority)
history_manager.push(granule)
@@ -78,11 +78,11 @@ class CollectionProcessor:
return self._history_manager_cache[dataset_id]
@staticmethod
- def _fill_template(collection: Collection, config_template: str) -> str:
+ def _fill_template(granule_path: str, collection: Collection, config_template: str) -> str:
renderer = pystache.Renderer()
config_content = renderer.render(config_template,
{
- 'granule': collection.path,
+ 'granule': granule_path,
'dataset_id': collection.dataset_id,
'variable': collection.variable
})
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 6bbe7d9..19a089b 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -9,7 +9,7 @@ from watchdog.observers import Observer
from yaml.scanner import ScannerError
from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import RelativePathError, YamlParsingError, \
+from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \
CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \
RelativePathCollectionError
@@ -84,8 +84,12 @@ class CollectionWatcher:
raise CollectionConfigFileNotFoundError("The collection config file could not be found at "
f"{self._collections_path}")
except yaml.scanner.ScannerError:
- raise YamlParsingError("Bad YAML syntax in collection configuration file. Will attempt to reload "
- "collections after the next configuration change.")
+ raise CollectionConfigParsingError("Bad YAML syntax in collection configuration file. Will attempt "
+ "to reload collections after the next configuration change.")
+ except KeyError:
+ raise CollectionConfigParsingError("The collections configuration YAML file does not conform to the "
+ "proper schema. Will attempt to reload collections config after the "
+ "next file modification.")
def _get_updated_collections(self) -> Set[Collection]:
old_collections = self.collections()
@@ -98,7 +102,7 @@ class CollectionWatcher:
self._collection_updated_callback(collection)
self._unschedule_watches()
self._schedule_watches()
- except YamlParsingError as e:
+ except CollectionConfigParsingError as e:
logger.error(e)
def _unschedule_watches(self):
@@ -111,7 +115,11 @@ class CollectionWatcher:
granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections)
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
- self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+ try:
+ self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+ except FileNotFoundError:
+ bad_collection_names = ' and '.join([col.dataset_id for col in collections])
+ logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
class _CollectionEventHandler(FileSystemEventHandler):
diff --git a/collection_manager/tests/resources/collections_bad.yml b/collection_manager/tests/resources/collections_bad.yml
deleted file mode 100644
index cac6a32..0000000
--- a/collection_manager/tests/resources/collections_bad.yml
+++ /dev/null
@@ -1,17 +0,0 @@
-collections:
- - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
- path: /opt/data/grace/*land*.nc
- variable: lwe_thickness
- priority: 1
- forward-processing-priority: 5
-BAD SYNTAX!
- - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
- path: /opt/data/grace/*ocean*.nc
- variable: lwe_thickness
- priority: 2
- forward-processing-priority: 6
-
- - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
- path: /opt/data/avhrr/*.nc
- variable: analysed_sst
- priority: 1
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index 7899e22..56d5393 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -46,7 +46,7 @@ class TestCollectionProcessor(unittest.TestCase):
expected = """
granule:
- resource: test_path
+ resource: /granules/test_granule.nc
processors:
- name: GridReadingProcessor
variable_to_read: test_variable
@@ -54,13 +54,13 @@ class TestCollectionProcessor(unittest.TestCase):
dataset_name: test_dataset
"""
collection = Collection(dataset_id="test_dataset",
- path="test_path",
+ path="/granules/test*.nc",
variable="test_variable",
historical_priority=1,
forward_processing_priority=2,
date_from=None,
date_to=None)
- filled = CollectionProcessor._fill_template(collection, template)
+ filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template)
self.assertEqual(filled, expected)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True)
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index 7ae25a1..8c6ab5f 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -6,7 +6,7 @@ from datetime import datetime
from unittest.mock import Mock
from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import YamlParsingError, CollectionConfigFileNotFoundError, \
+from collection_manager.entities.exceptions import CollectionConfigParsingError, CollectionConfigFileNotFoundError, \
RelativePathCollectionError, ConflictingPathCollectionError
from collection_manager.services import CollectionWatcher
@@ -38,10 +38,16 @@ class TestCollectionWatcher(unittest.TestCase):
self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/avhrr']), 1)
def test_load_collections_with_bad_yaml_syntax(self):
- collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad.yml')
+ collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad_syntax.yml')
collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
- self.assertRaises(YamlParsingError, collection_watcher._load_collections)
+ self.assertRaises(CollectionConfigParsingError, collection_watcher._load_collections)
+
+ def test_load_collections_with_bad_schema(self):
+ collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad_schema.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
+
+ self.assertRaises(CollectionConfigParsingError, collection_watcher._load_collections)
def test_load_collections_with_file_not_found(self):
collections_path = os.path.join(os.path.dirname(__file__), '../resources/does_not_exist.yml')