You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/25 17:48:03 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-282, SDAP-284: Support configuring dimension names and projection during ingestion, support hierarchical directory structures (#19)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6097580  SDAP-282, SDAP-284: Support configuring dimension names and projection during ingestion, support hierarchical directory structures (#19)
6097580 is described below

commit 60975805db0e9f1088035f1506616b138d0b671b
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 25 10:47:56 2020 -0700

    SDAP-282, SDAP-284: Support configuring dimension names and projection during ingestion, support hierarchical directory structures (#19)
---
 .../collection_manager/entities/Collection.py      | 10 ++-
 .../resources/dataset_config_template.yml          | 20 ------
 .../services/CollectionProcessor.py                | 46 ++++++++-----
 .../services/CollectionWatcher.py                  | 15 +++--
 .../services/MessagePublisher.py                   |  2 +-
 collection_manager/setup.py                        |  1 -
 .../tests/entities/test_Collection.py              | 60 ++++++++++++++---
 collection_manager/tests/resources/collections.yml | 36 +++++++++-
 .../tests/resources/collections_alternate.yml      | 34 +++++++++-
 .../tests/services/test_CollectionProcessor.py     | 78 ++++++++++++++--------
 .../tests/services/test_CollectionWatcher.py       | 63 ++++++++++++++---
 config_operator/tests/resources/collections.yml    | 11 ++-
 .../consumer/{Consumer.py => MessageConsumer.py}   |  4 +-
 .../granule_ingester/consumer/__init__.py          |  2 +-
 .../granule_ingester/exceptions/Exceptions.py      |  7 +-
 .../granule_ingester/exceptions/__init__.py        | 19 +++---
 .../granule_loaders/GranuleLoader.py               |  9 ++-
 granule_ingester/granule_ingester/main.py          | 22 +++---
 .../granule_ingester/pipeline/Modules.py           | 17 +++--
 .../granule_ingester/pipeline/Pipeline.py          | 11 ++-
 .../reading_processors/EccoReadingProcessor.py     |  8 +--
 .../reading_processors/GridReadingProcessor.py     |  8 +--
 .../reading_processors/SwathReadingProcessor.py    |  6 +-
 .../reading_processors/TileReadingProcessor.py     | 10 +--
 .../TimeSeriesReadingProcessor.py                  |  8 +--
 .../config_files/ingestion_config_testfile.yaml    |  4 +-
 granule_ingester/tests/pipeline/test_Pipeline.py   |  7 +-
 .../test_EccoReadingProcessor.py                   |  4 +-
 .../test_GridReadingProcessor.py                   |  2 +-
 .../test_SwathReadingProcessor.py                  |  4 +-
 30 files changed, 358 insertions(+), 170 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 3976b6d..031a3a9 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -11,7 +11,9 @@ from collection_manager.entities.exceptions import MissingValueCollectionError
 @dataclass(frozen=True)
 class Collection:
     dataset_id: str
-    variable: str
+    projection: str
+    dimension_names: frozenset
+    slices: frozenset
     path: str
     historical_priority: int
     forward_processing_priority: Optional[int] = None
@@ -25,7 +27,9 @@ class Collection:
             date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None
 
             collection = Collection(dataset_id=properties['id'],
-                                    variable=properties['variable'],
+                                    projection=properties['projection'],
+                                    dimension_names=frozenset(properties['dimensionNames'].items()),
+                                    slices=frozenset(properties['slices'].items()),
                                     path=properties['path'],
                                     historical_priority=properties['priority'],
                                     forward_processing_priority=properties.get('forward-processing-priority', None),
@@ -51,4 +55,4 @@ class Collection:
             return fnmatch(file_path, self.path)
 
     def files_owned(self) -> List[str]:
-        return glob(self.path)
+        return glob(self.path, recursive=True)
diff --git a/collection_manager/collection_manager/resources/dataset_config_template.yml b/collection_manager/collection_manager/resources/dataset_config_template.yml
deleted file mode 100644
index d35a527..0000000
--- a/collection_manager/collection_manager/resources/dataset_config_template.yml
+++ /dev/null
@@ -1,20 +0,0 @@
-granule:
-  resource: {{granule}}
-slicer:
-  name: sliceFileByStepSize
-  dimension_step_sizes:
-    time: 1
-    lat: 30
-    lon: 30
-processors:
-  - name: GridReadingProcessor
-    latitude: lat
-    longitude: lon
-    time: time
-    variable_to_read: {{variable}}
-  - name: emptyTileFilter
-  - name: kelvinToCelsius
-  - name: tileSummary
-    dataset_name: {{dataset_id}}
-  - name: generateTileId
-
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index ac61586..f08ade9 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,8 +1,7 @@
 import logging
 import os.path
 from typing import Dict
-
-import pystache
+import yaml
 
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
@@ -11,8 +10,7 @@ from collection_manager.services.history_manager.IngestionHistory import Ingesti
 
 logger = logging.getLogger(__name__)
 
-SUPPORTED_FILE_EXTENSIONS = ['.nc', '.h5']
-MESSAGE_TEMPLATE = os.path.join(os.path.dirname(__file__), '../resources/dataset_config_template.yml')
+SUPPORTED_FILE_EXTENSIONS = ['.nc', '.nc4', '.h5']
 
 
 class CollectionProcessor:
@@ -22,9 +20,6 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-        with open(MESSAGE_TEMPLATE, 'r') as config_template_file:
-            self._config_template = config_template_file.read()
-
     async def process_collection(self, collection: Collection):
         """
         Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each.
@@ -63,7 +58,7 @@ class CollectionProcessor:
                          f"collection '{collection.dataset_id}'. Skipping.")
             return
 
-        dataset_config = self._fill_template(granule, collection, config_template=self._config_template)
+        dataset_config = self._generate_ingestion_message(granule, collection)
         await self._publisher.publish_message(body=dataset_config, priority=use_priority)
         await history_manager.push(granule)
 
@@ -78,13 +73,28 @@ class CollectionProcessor:
         return self._history_manager_cache[dataset_id]
 
     @staticmethod
-    def _fill_template(granule_path: str, collection: Collection, config_template: str) -> str:
-        renderer = pystache.Renderer()
-        config_content = renderer.render(config_template,
-                                         {
-                                             'granule': granule_path,
-                                             'dataset_id': collection.dataset_id,
-                                             'variable': collection.variable
-                                         })
-        logger.debug(f"Templated dataset config:\n{config_content}")
-        return config_content
+    def _generate_ingestion_message(granule_path: str, collection: Collection) -> str:
+        config_dict = {
+            'granule': {
+                'resource': granule_path
+            },
+            'slicer': {
+                'name': 'sliceFileByStepSize',
+                'dimension_step_sizes': dict(collection.slices)
+            },
+            'processors': [
+                {
+                    'name': collection.projection,
+                    **dict(collection.dimension_names),
+                },
+                {'name': 'emptyTileFilter'},
+                {
+                    'name': 'tileSummary',
+                    'dataset_name': collection.dataset_id
+                },
+                {'name': 'generateTileId'}
+            ]
+        }
+        config_str = yaml.dump(config_dict)
+        logger.debug(f"Templated dataset config:\n{config_str}")
+        return config_str
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 54c8877..1c7c1be 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -50,7 +50,7 @@ class CollectionWatcher:
                                      func=self._reload_and_reschedule)
         self._observer.start()
 
-    def collections(self) -> Set[Collection]:
+    def _collections(self) -> Set[Collection]:
         """
         Return a set of all Collections being watched.
         :return: A set of Collections
@@ -96,9 +96,9 @@ class CollectionWatcher:
                                                "next file modification.")
 
     def _get_updated_collections(self) -> Set[Collection]:
-        old_collections = self.collections()
+        old_collections = self._collections()
         self._load_collections()
-        return self.collections() - old_collections
+        return self._collections() - old_collections
 
     async def _reload_and_reschedule(self):
         try:
@@ -128,7 +128,7 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                self._granule_watches.add(self._observer.schedule(granule_event_handler, directory))
+                self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True))
             except (FileNotFoundError, NotADirectoryError):
                 bad_collection_names = ' and '.join([col.dataset_id for col in collections])
                 logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.")
@@ -169,8 +169,11 @@ class _GranuleEventHandler(FileSystemEventHandler):
     def on_created(self, event):
         super().on_created(event)
         for collection in self._collections_for_dir:
-            if collection.owns_file(event.src_path):
-                self._loop.create_task(self._callback(event.src_path, collection))
+            try:
+                if collection.owns_file(event.src_path):
+                    self._loop.create_task(self._callback(event.src_path, collection))
+            except IsADirectoryError:
+                pass
 
     def on_modified(self, event):
         super().on_modified(event)
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 75803d1..483fff8 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -25,7 +25,7 @@ class MessagePublisher:
         """
         self._connection = await connect_robust(self._connection_string)
         self._channel = await self._connection.channel()
-        await self._channel.declare_queue(self._queue, durable=True)
+        await self._channel.declare_queue(self._queue, durable=True, arguments={'x-max-priority': 10})
 
     @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4))
     async def publish_message(self, body: str, priority: int = None):
diff --git a/collection_manager/setup.py b/collection_manager/setup.py
index 1542486..0616d0f 100644
--- a/collection_manager/setup.py
+++ b/collection_manager/setup.py
@@ -31,6 +31,5 @@ setuptools.setup(
     ],
     python_requires='>=3.6',
     include_package_data=True,
-    data_files=[('.collection_manager/resources/', ['collection_manager/resources/dataset_config_template.yml'])],
     install_requires=pip_requirements
 )
diff --git a/collection_manager/tests/entities/test_Collection.py b/collection_manager/tests/entities/test_Collection.py
index 46506d4..7e56c9d 100644
--- a/collection_manager/tests/entities/test_Collection.py
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -12,7 +12,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -23,7 +25,9 @@ class TestCollection(unittest.TestCase):
         pattern = os.path.join(os.path.dirname(__file__), "../resources/data/*.nc")
         collection = Collection(dataset_id="test_dataset",
                                 path=pattern,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -34,7 +38,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -45,7 +51,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -57,7 +65,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -69,7 +79,9 @@ class TestCollection(unittest.TestCase):
         pattern = os.path.join(directory, "test_*.nc")
         collection = Collection(dataset_id="test_dataset",
                                 path=pattern,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -82,7 +94,9 @@ class TestCollection(unittest.TestCase):
         pattern = os.path.join(directory, "test_*.nc")
         collection = Collection(dataset_id="test_dataset",
                                 path=pattern,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -93,8 +107,14 @@ class TestCollection(unittest.TestCase):
     def test_from_dict(self):
         collection_dict = {
             'id': 'test_id',
-            'variable': 'test_var',
             'path': '/some/path',
+            'projection': 'Grid',
+            'dimensionNames': {
+                'latitude': 'lat',
+                'longitude': 'lon',
+                'variable': 'test_var'
+            },
+            'slices': {'lat': 30, 'lon': 30, 'time': 1},
             'priority': 1,
             'forward-processing-priority': 2,
             'from': '2020-01-01T00:00:00+00:00',
@@ -102,7 +122,13 @@ class TestCollection(unittest.TestCase):
         }
 
         expected_collection = Collection(dataset_id='test_id',
-                                         variable='test_var',
+                                         projection="Grid",
+                                         slices=frozenset([('lat', 30), ('lon', 30), ('time', 1)]),
+                                         dimension_names=frozenset([
+                                             ('latitude', 'lat'),
+                                             ('longitude', 'lon'),
+                                             ('variable', 'test_var')
+                                         ]),
                                          path='/some/path',
                                          historical_priority=1,
                                          forward_processing_priority=2,
@@ -114,13 +140,25 @@ class TestCollection(unittest.TestCase):
     def test_from_dict_missing_optional_values(self):
         collection_dict = {
             'id': 'test_id',
-            'variable': 'test_var',
+            'projection': 'Grid',
+            'dimensionNames': {
+                'latitude': 'lat',
+                'longitude': 'lon',
+                'variable': 'test_var'
+            },
+            'slices': {'lat': 30, 'lon': 30, 'time': 1},
             'path': '/some/path',
             'priority': 3
         }
 
         expected_collection = Collection(dataset_id='test_id',
-                                         variable='test_var',
+                                         projection="Grid",
+                                         slices=frozenset([('lat', 30), ('lon', 30), ('time', 1)]),
+                                         dimension_names=frozenset([
+                                             ('latitude', 'lat'),
+                                             ('longitude', 'lon'),
+                                             ('variable', 'test_var')
+                                         ]),
                                          path='/some/path',
                                          historical_priority=3,
                                          forward_processing_priority=None,
diff --git a/collection_manager/tests/resources/collections.yml b/collection_manager/tests/resources/collections.yml
index 89524ec..44f795b 100644
--- a/collection_manager/tests/resources/collections.yml
+++ b/collection_manager/tests/resources/collections.yml
@@ -1,17 +1,47 @@
 collections:
   - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
     path: /opt/data/grace/*land*.nc
-    variable: lwe_thickness
     priority: 1
     forward-processing-priority: 5
+    projection: Grid
+    dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+    slices:
+      time: 1
+      lat: 30
+      lon: 30
+
 
   - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
     path: /opt/data/grace/*ocean*.nc
-    variable: lwe_thickness
     priority: 2
     forward-processing-priority: 6
+    projection: Grid
+    dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+    slices:
+      time: 1
+      lat: 30
+      lon: 30
+
 
   - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
     path: /opt/data/avhrr/*.nc
-    variable: analysed_sst
     priority: 1
+    projection: Grid
+    dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: analysed_sst
+    slices:
+      time: 1
+      lat: 30
+      lon: 30
+
diff --git a/collection_manager/tests/resources/collections_alternate.yml b/collection_manager/tests/resources/collections_alternate.yml
index 3d7da95..f9dabda 100644
--- a/collection_manager/tests/resources/collections_alternate.yml
+++ b/collection_manager/tests/resources/collections_alternate.yml
@@ -1,17 +1,45 @@
 collections:
   - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
     path: /opt/data/grace/*land*.nc
-    variable: lwe_thickness
     priority: 1
     forward-processing-priority: 5
+    projection: Grid
+    dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+    slices:
+      time: 1
+      lat: 30
+      lon: 30
 
   - id: ID_CHANGED
     path: /opt/data/grace/*ocean*.nc
-    variable: lwe_thickness
     priority: 2
     forward-processing-priority: 6
+    projection: Grid
+    dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+    slices:
+      time: 1
+      lat: 30
+      lon: 30
 
   - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
     path: /opt/data/avhrr/*.nc
-    variable: analysed_sst
     priority: 1
+    projection: Grid
+    dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: analysed_sst
+    slices:
+      time: 1
+      lat: 30
+      lon: 30
+
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py
index a7059d6..939b5d1 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -1,4 +1,5 @@
 import tempfile
+import yaml
 import unittest
 from unittest import mock
 
@@ -35,34 +36,47 @@ class TestCollectionProcessor(unittest.TestCase):
             self.assertIsNot(collection_processor._get_history_manager('bar'), history_manager)
 
     def test_fill_template(self):
-        template = """
-        granule:
-          resource: {{granule}}
-        processors:
-          - name: GridReadingProcessor
-            variable_to_read: {{variable}}
-          - name: tileSummary
-            dataset_name: {{dataset_id}}
-            """
-
-        expected = """
-        granule:
-          resource: /granules/test_granule.nc
-        processors:
-          - name: GridReadingProcessor
-            variable_to_read: test_variable
-          - name: tileSummary
-            dataset_name: test_dataset
-            """
+        expected = {
+            'granule': {
+                'resource': '/granules/test_granule.nc'
+            },
+            'processors': [
+                {
+                    'latitude': 'lat',
+                    'longitude': 'lon',
+                    'name': 'Grid',
+                    'variable': 'test_var'
+                },
+                {'name': 'emptyTileFilter'},
+                {'dataset_name': 'test_dataset', 'name': 'tileSummary'},
+                {'name': 'generateTileId'}
+            ],
+            'slicer': {
+                'dimension_step_sizes': {
+                    'lat': 30,
+                    'lon': 30,
+                    'time': 1
+                },
+                'name': 'sliceFileByStepSize'
+            }
+        }
         collection = Collection(dataset_id="test_dataset",
                                 path="/granules/test*.nc",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset([('lat', 30), ('lon', 30), ('time', 1)]),
+                                dimension_names=frozenset([
+                                    ('latitude', 'lat'),
+                                    ('longitude', 'lon'),
+                                    ('variable', 'test_var')
+                                ]),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
                                 date_to=None)
-        filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template)
-        self.assertEqual(filled, expected)
+        filled = CollectionProcessor._generate_ingestion_message("/granules/test_granule.nc", collection)
+        generated_yaml = yaml.load(filled, Loader=yaml.FullLoader)
+
+        self.assertEqual(expected, generated_yaml)
 
     @async_test
     @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock)
@@ -75,7 +89,9 @@ class TestCollectionProcessor(unittest.TestCase):
         collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
         collection = Collection(dataset_id="test_dataset",
                                 path="test_path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -100,7 +116,9 @@ class TestCollectionProcessor(unittest.TestCase):
         collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
         collection = Collection(dataset_id="test_dataset",
                                 path="test_path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -123,7 +141,9 @@ class TestCollectionProcessor(unittest.TestCase):
         collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
         collection = Collection(dataset_id="test_dataset",
                                 path="test_path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 date_from=None,
                                 date_to=None)
@@ -144,7 +164,9 @@ class TestCollectionProcessor(unittest.TestCase):
         collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
         collection = Collection(dataset_id="test_dataset",
                                 path="test_path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -165,7 +187,9 @@ class TestCollectionProcessor(unittest.TestCase):
         collection_processor = CollectionProcessor(mock_publisher, mock_history_builder)
         collection = Collection(dataset_id="test_dataset",
                                 path="test_path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index c9a75c0..e6bf15f 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -25,7 +25,7 @@ class TestCollectionWatcher(unittest.TestCase):
                 Collection("id4", "var4", "path4", 7, 8, datetime.now(), datetime.now()),
             }
         }
-        flattened_collections = collection_watcher.collections()
+        flattened_collections = collection_watcher._collections()
         self.assertEqual(len(flattened_collections), 4)
 
     def test_load_collections_loads_all_collections(self):
@@ -60,7 +60,7 @@ class TestCollectionWatcher(unittest.TestCase):
         collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
 
         updated_collections = collection_watcher._get_updated_collections()
-        self.assertSetEqual(updated_collections, collection_watcher.collections())
+        self.assertSetEqual(updated_collections, collection_watcher._collections())
 
     def test_get_updated_collections_returns_no_collections(self):
         collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml')
@@ -87,7 +87,9 @@ class TestCollectionWatcher(unittest.TestCase):
 
         collection = Collection(dataset_id="test_dataset",
                                 path="/absolute/path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -100,7 +102,9 @@ class TestCollectionWatcher(unittest.TestCase):
 
         collection = Collection(dataset_id="test_dataset",
                                 path="relative/path",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -113,7 +117,9 @@ class TestCollectionWatcher(unittest.TestCase):
 
         collection = Collection(dataset_id="test_dataset",
                                 path="/resources/*.nc",
-                                variable="test_variable",
+                                projection="Grid",
+                                slices=frozenset(),
+                                dimension_names=frozenset(),
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -127,9 +133,19 @@ class TestCollectionWatcher(unittest.TestCase):
         collections_str = f"""collections:
 - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
   path: {granule_dir.name}
-  variable: lwe_thickness
   priority: 1
-  forward-processing-priority: 5"""
+  forward-processing-priority: 5
+  projection: Grid
+  dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+  slices:
+      time: 1
+      lat: 30
+      lon: 30
+  """
         collections_config.write(collections_str.encode("utf-8"))
 
         collection_callback = AsyncMock()
@@ -143,9 +159,18 @@ class TestCollectionWatcher(unittest.TestCase):
         collections_str = f"""
 - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
   path: {granule_dir.name}
-  variable: lwe_thickness
   priority: 10
   forward-processing-priority: 5
+  projection: Grid
+  dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+  slices:
+      time: 1
+      lat: 30
+      lon: 30
         """
         collections_config.write(collections_str.encode("utf-8"))
 
@@ -163,9 +188,18 @@ class TestCollectionWatcher(unittest.TestCase):
 collections:
 - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
   path: {granule_dir.name}
-  variable: lwe_thickness
   priority: 1
   forward-processing-priority: 5
+  projection: Grid
+  dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+  slices:
+      time: 1
+      lat: 30
+      lon: 30
             """
             collections_config.write(collections_str.encode("utf-8"))
 
@@ -187,9 +221,18 @@ collections:
 collections:
 - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
   path: {granule_dir.name}
-  variable: lwe_thickness
   priority: 1
   forward-processing-priority: 5
+  projection: Grid
+  dimensionNames:
+      latitude: lat
+      longitude: lon
+      time: time
+      variable: lwe_thickness
+  slices:
+      time: 1
+      lat: 30
+      lon: 30
             """
             collections_config.write(collections_str.encode("utf-8"))
             new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
diff --git a/config_operator/tests/resources/collections.yml b/config_operator/tests/resources/collections.yml
index 42d2fbc..3414c5a 100644
--- a/config_operator/tests/resources/collections.yml
+++ b/config_operator/tests/resources/collections.yml
@@ -5,5 +5,14 @@ avhrr-oi-analysed-sst:
 
 avhrr-oi-analysed-sst2:
   path: resources/history_manager/data/avhrr_oi/*.nc
-  variable: analysed_sst
   priority: 1
+  projection: Grid
+  dimensionNames:
+    latitude: lat
+    longitude: lon
+    time: time
+    variable: analysed_sst
+  slices:
+    time: 1
+    lat: 30
+    lon: 30
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/MessageConsumer.py
similarity index 98%
rename from granule_ingester/granule_ingester/consumer/Consumer.py
rename to granule_ingester/granule_ingester/consumer/MessageConsumer.py
index 6c72837..4d6c07b 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/MessageConsumer.py
@@ -25,7 +25,7 @@ from granule_ingester.pipeline import Pipeline
 logger = logging.getLogger(__name__)
 
 
-class Consumer(HealthCheck):
+class MessageConsumer(HealthCheck):
 
     def __init__(self,
                  rabbitmq_host,
@@ -95,7 +95,7 @@ class Consumer(HealthCheck):
     async def start_consuming(self, pipeline_max_concurrency=16):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
-        queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
+        queue = await channel.declare_queue(self._rabbitmq_queue, durable=True, arguments={'x-max-priority': 10})
         queue_iter = queue.iterator()
         async for message in queue_iter:
             try:
diff --git a/granule_ingester/granule_ingester/consumer/__init__.py b/granule_ingester/granule_ingester/consumer/__init__.py
index 35d075b..bb782d5 100644
--- a/granule_ingester/granule_ingester/consumer/__init__.py
+++ b/granule_ingester/granule_ingester/consumer/__init__.py
@@ -1 +1 @@
-from granule_ingester.consumer.Consumer import Consumer
+from granule_ingester.consumer.MessageConsumer import MessageConsumer
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index c648b99..fdd03e5 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -6,7 +6,11 @@ class PipelineRunningError(Exception):
     pass
 
 
-class TileProcessingError(Exception):
+class TileProcessingError(PipelineRunningError):
+    pass
+
+
+class GranuleLoadingError(PipelineRunningError):
     pass
 
 
@@ -21,6 +25,7 @@ class RabbitMQLostConnectionError(LostConnectionError):
 class CassandraLostConnectionError(LostConnectionError):
     pass
 
+
 class SolrLostConnectionError(LostConnectionError):
     pass
 
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index ea0969f..f2429b1 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,11 +1,8 @@
-from .Exceptions import CassandraFailedHealthCheckError
-from .Exceptions import CassandraLostConnectionError
-from .Exceptions import FailedHealthCheckError
-from .Exceptions import LostConnectionError
-from .Exceptions import PipelineBuildingError
-from .Exceptions import PipelineRunningError
-from .Exceptions import RabbitMQFailedHealthCheckError
-from .Exceptions import RabbitMQLostConnectionError
-from .Exceptions import SolrFailedHealthCheckError
-from .Exceptions import SolrLostConnectionError
-from .Exceptions import TileProcessingError
+from .Exceptions import (CassandraFailedHealthCheckError,
+                         CassandraLostConnectionError, FailedHealthCheckError,
+                         GranuleLoadingError, LostConnectionError,
+                         PipelineBuildingError, PipelineRunningError,
+                         RabbitMQFailedHealthCheckError,
+                         RabbitMQLostConnectionError,
+                         SolrFailedHealthCheckError, SolrLostConnectionError,
+                         TileProcessingError)
diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
index c28ffbb..6377de0 100644
--- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
+++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py
@@ -21,6 +21,8 @@ from urllib import parse
 import aioboto3
 import xarray as xr
 
+from granule_ingester.exceptions import GranuleLoadingError
+
 logger = logging.getLogger(__name__)
 
 
@@ -52,7 +54,12 @@ class GranuleLoader:
             raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme))
 
         granule_name = os.path.basename(self._resource)
-        return xr.open_dataset(file_path, lock=False), granule_name
+        try:
+            return xr.open_dataset(file_path, lock=False), granule_name
+        except FileNotFoundError:
+            raise GranuleLoadingError(f"The granule file {self._resource} does not exist.")
+        except Exception:
+            raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.")
 
     @staticmethod
     async def _download_s3_file(url: str):
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 15390fd..b5a429c 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -20,7 +20,7 @@ import sys
 from functools import partial
 from typing import List
 
-from granule_ingester.consumer import Consumer
+from granule_ingester.consumer import MessageConsumer
 from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore, SolrStore
@@ -116,16 +116,16 @@ async def main(loop):
     solr_host_and_port = args.solr_host_and_port
     zk_host_and_port = args.zk_host_and_port
 
-    consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
-                        rabbitmq_username=args.rabbitmq_username,
-                        rabbitmq_password=args.rabbitmq_password,
-                        rabbitmq_queue=args.rabbitmq_queue,
-                        data_store_factory=partial(cassandra_factory,
-                                                   cassandra_contact_points,
-                                                   cassandra_port,
-                                                   cassandra_username,
-                                                   cassandra_password),
-                        metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
+    consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
+                               rabbitmq_username=args.rabbitmq_username,
+                               rabbitmq_password=args.rabbitmq_password,
+                               rabbitmq_queue=args.rabbitmq_queue,
+                               data_store_factory=partial(cassandra_factory,
+                                                          cassandra_contact_points,
+                                                          cassandra_port,
+                                                          cassandra_username,
+                                                          cassandra_password),
+                               metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
     try:
         solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
         await run_health_checks([CassandraStore(cassandra_contact_points,
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py
index 2cf2245..d1950dc 100644
--- a/granule_ingester/granule_ingester/pipeline/Modules.py
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -1,14 +1,19 @@
-from granule_ingester.processors import *
-from granule_ingester.processors.reading_processors import *
-from granule_ingester.slicers import *
-from granule_ingester.granule_loaders import *
+from granule_ingester.processors import GenerateTileId, TileSummarizingProcessor, EmptyTileFilter, KelvinToCelsius
+from granule_ingester.processors.reading_processors import (EccoReadingProcessor,
+                                                            GridReadingProcessor,
+                                                            SwathReadingProcessor,
+                                                            TimeSeriesReadingProcessor)
+from granule_ingester.slicers import SliceFileByStepSize
+from granule_ingester.granule_loaders import GranuleLoader
 
 modules = {
     "granule": GranuleLoader,
     "sliceFileByStepSize": SliceFileByStepSize,
     "generateTileId": GenerateTileId,
-    "EccoReadingProcessor": EccoReadingProcessor,
-    "GridReadingProcessor": GridReadingProcessor,
+    "ECCO": EccoReadingProcessor,
+    "Grid": GridReadingProcessor,
+    "TimeSeries": TimeSeriesReadingProcessor,
+    "Swath": SwathReadingProcessor,
     "tileSummary": TileSummarizingProcessor,
     "emptyTileFilter": EmptyTileFilter,
     "kelvinToCelsius": KelvinToCelsius
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index dabca81..86bf9c8 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -153,10 +153,13 @@ class Pipeline:
                        metadata_store_factory,
                        tile_processors,
                        max_concurrency)
+        except PipelineBuildingError:
+            raise
         except KeyError as e:
             raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
-        except Exception:
-            raise PipelineBuildingError("Cannot build pipeline.")
+        except Exception as e:
+            logger.exception(e)
+            raise PipelineBuildingError(f"Cannot build pipeline because of the following error: {e}")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -166,7 +169,9 @@ class Pipeline:
             logger.debug("Loaded processor {}.".format(module_class))
             processor_module = module_class(**module_config)
         except KeyError:
-            raise RuntimeError("'{}' is not a valid processor.".format(module_name))
+            raise PipelineBuildingError(f"'{module_name}' is not a valid processor.")
+        except Exception as e:
+            raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}")
 
         return processor_module
 
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
index 1876013..8cc24d0 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
@@ -10,14 +10,14 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 class EccoReadingProcessor(TileReadingProcessor):
     def __init__(self,
-                 variable_to_read,
+                 variable,
                  latitude,
                  longitude,
                  tile,
                  depth=None,
                  time=None,
                  **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+        super().__init__(variable, latitude, longitude, **kwargs)
 
         self.depth = depth
         self.time = time
@@ -31,8 +31,8 @@ class EccoReadingProcessor(TileReadingProcessor):
         lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
         lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
 
-        data_subset = ds[self.variable_to_read][
-            type(self)._slices_for_variable(ds[self.variable_to_read], dimensions_to_slices)]
+        data_subset = ds[self.variable][
+            type(self)._slices_for_variable(ds[self.variable], dimensions_to_slices)]
         data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
 
         new_tile.tile = ds[self.tile][dimensions_to_slices[self.tile].start].item()
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 4354f9e..1ba76a2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 
 class GridReadingProcessor(TileReadingProcessor):
-    def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+    def __init__(self, variable, latitude, longitude, depth=None, time=None, **kwargs):
+        super().__init__(variable, latitude, longitude, **kwargs)
         self.depth = depth
         self.time = time
 
@@ -22,8 +22,8 @@ class GridReadingProcessor(TileReadingProcessor):
         lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
         lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
 
-        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
-                                                                                dimensions_to_slices)]
+        data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+                                                                        dimensions_to_slices)]
         data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
 
         if self.depth:
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index fec28ca..5b8072a 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 
 class SwathReadingProcessor(TileReadingProcessor):
-    def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+    def __init__(self, variable, latitude, longitude, time, depth=None, **kwargs):
+        super().__init__(variable, latitude, longitude, **kwargs)
         self.depth = depth
         self.time = time
 
@@ -25,7 +25,7 @@ class SwathReadingProcessor(TileReadingProcessor):
         time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)]
         time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
 
-        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+        data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
                                                                                 dimensions_to_slices)]
         data_subset = np.ma.filled(data_subset, np.NaN)
 
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 8b69ad2..aa70db3 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -27,8 +27,8 @@ from granule_ingester.processors.TileProcessor import TileProcessor
 
 class TileReadingProcessor(TileProcessor, ABC):
 
-    def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, **kwargs):
-        self.variable_to_read = variable_to_read
+    def __init__(self, variable: str, latitude: str, longitude: str, *args, **kwargs):
+        self.variable = variable
         self.latitude = latitude
         self.longitude = longitude
 
@@ -38,11 +38,11 @@ class TileReadingProcessor(TileProcessor, ABC):
 
             output_tile = nexusproto.NexusTile()
             output_tile.CopyFrom(tile)
-            output_tile.summary.data_var_name = self.variable_to_read
+            output_tile.summary.data_var_name = self.variable
 
             return self._generate_tile(dataset, dimensions_to_slices, output_tile)
-        except Exception:
-            raise TileProcessingError("Could not generate tiles from the granule.")
+        except Exception as e:
+            raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.")
 
     @abstractmethod
     def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
index 2831c0c..c4aae25 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 
 class TimeSeriesReadingProcessor(TileReadingProcessor):
-    def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+    def __init__(self, variable, latitude, longitude, time, depth=None, **kwargs):
+        super().__init__(variable, latitude, longitude, **kwargs)
 
         self.depth = depth
         self.time = time
@@ -23,8 +23,8 @@ class TimeSeriesReadingProcessor(TileReadingProcessor):
         lat_subset = np.ma.filled(lat_subset, np.NaN)
         lon_subset = np.ma.filled(lon_subset, np.NaN)
 
-        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
-                                                                                dimensions_to_slices)]
+        data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+                                                                        dimensions_to_slices)]
         data_subset = np.ma.filled(data_subset, np.NaN)
 
         if self.depth:
diff --git a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
index 9af889d..63df51a 100644
--- a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
+++ b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
@@ -7,11 +7,11 @@ slicer:
     lat: 33
     lon: 26
 processors:
-  - name: EccoReadingProcessor
+  - name: ECCO
     latitude: YC
     longitude: XC
     time: time
     depth: Z
     tile: tile
-    variable_to_read: THETA
+    variable: THETA
   - name: generateTileId
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index 34e66c6..27ec72b 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -6,8 +6,9 @@ from nexusproto import DataTile_pb2 as nexusproto
 from granule_ingester.pipeline.Pipeline import Pipeline
 from granule_ingester.processors import GenerateTileId
 from granule_ingester.processors.reading_processors import EccoReadingProcessor
-from granule_ingester.slicers.SliceFileByStepSize import *
+from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize
 from granule_ingester.writers import DataStore, MetadataStore
+from granule_ingester.exceptions import PipelineBuildingError
 
 
 class TestPipeline(unittest.TestCase):
@@ -70,7 +71,7 @@ class TestPipeline(unittest.TestCase):
             "name": "MockModule",
             "bogus_param": True
         }
-        self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+        self.assertRaises(PipelineBuildingError, Pipeline._parse_module, module_config, module_mappings)
 
     def test_parse_module_with_missing_parameters(self):
         module_mappings = {"MockModule": TestPipeline.MockProcessorWithParams}
@@ -78,7 +79,7 @@ class TestPipeline(unittest.TestCase):
             "name": "MockModule"
         }
 
-        self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+        self.assertRaises(PipelineBuildingError, Pipeline._parse_module, module_config, module_mappings)
 
     def test_process_tile(self):
         # class MockIdProcessor:
diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
index f2e9f29..03d5054 100644
--- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
@@ -10,7 +10,7 @@ from granule_ingester.processors.reading_processors import EccoReadingProcessor
 class TestEccoReadingProcessor(unittest.TestCase):
 
     def test_generate_tile(self):
-        reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+        reading_processor = EccoReadingProcessor(variable='OBP',
                                                  latitude='YC',
                                                  longitude='XC',
                                                  time='time',
@@ -40,7 +40,7 @@ class TestEccoReadingProcessor(unittest.TestCase):
             self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
 
     def test_generate_tile_with_dims_out_of_order(self):
-        reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+        reading_processor = EccoReadingProcessor(variable='OBP',
                                                  latitude='YC',
                                                  longitude='XC',
                                                  time='time',
diff --git a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
index aec3ae8..31cb547 100644
--- a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
@@ -209,7 +209,7 @@ class TestReadInterpEccoData(unittest.TestCase):
                                            time='time')
 
     def test_read_indexed_ecco(self):
-        reading_processor = GridReadingProcessor(variable_to_read='OBP',
+        reading_processor = GridReadingProcessor(variable='OBP',
                                                  latitude='latitude',
                                                  longitude='longitude',
                                                  time='time')
diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
index 55ac4fc..db623f5 100644
--- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
@@ -24,7 +24,7 @@ from granule_ingester.processors.reading_processors import SwathReadingProcessor
 
 class TestReadAscatbData(unittest.TestCase):
     def test_read_not_empty_ascatb(self):
-        reading_processor = SwathReadingProcessor(variable_to_read='wind_speed',
+        reading_processor = SwathReadingProcessor(variable='wind_speed',
                                                   latitude='lat',
                                                   longitude='lon',
                                                   time='time')
@@ -50,7 +50,7 @@ class TestReadAscatbData(unittest.TestCase):
 class TestReadSmapData(unittest.TestCase):
     def test_read_not_empty_smap(self):
         reading_processor = SwathReadingProcessor(
-            variable_to_read='smap_sss',
+            variable='smap_sss',
             latitude='lat',
             longitude='lon',
             time='row_time')