You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/19 01:13:44 UTC

[incubator-sdap-ingester] branch configure-processors updated: wip

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch configure-processors
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/configure-processors by this push:
     new 03ed419  wip
03ed419 is described below

commit 03ed4191c1daa99a577fceda257d1c5a562e8b9c
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 18 18:13:28 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 10 ++++--
 .../services/CollectionProcessor.py                | 37 +++++++++++++++-------
 .../services/CollectionWatcher.py                  |  6 ++--
 .../tests/entities/test_Collection.py              | 20 +++++++++---
 .../tests/services/test_CollectionWatcher.py       |  4 +--
 config_operator/tests/resources/collections.yml    | 11 ++++++-
 .../granule_ingester/pipeline/Modules.py           | 17 ++++++----
 .../granule_ingester/pipeline/Pipeline.py          | 11 +++++--
 .../reading_processors/EccoReadingProcessor.py     |  8 ++---
 .../reading_processors/GridReadingProcessor.py     |  8 ++---
 .../reading_processors/SwathReadingProcessor.py    |  6 ++--
 .../reading_processors/TileReadingProcessor.py     |  6 ++--
 .../TimeSeriesReadingProcessor.py                  |  8 ++---
 .../config_files/ingestion_config_testfile.yaml    |  4 +--
 granule_ingester/tests/pipeline/test_Pipeline.py   |  7 ++--
 .../test_EccoReadingProcessor.py                   |  4 +--
 .../test_GridReadingProcessor.py                   |  2 +-
 .../test_SwathReadingProcessor.py                  |  4 +--
 18 files changed, 111 insertions(+), 62 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index 3976b6d..0ca12d9 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -3,7 +3,7 @@ from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
-from typing import List, Optional
+from typing import List, Optional, Dict
 
 from collection_manager.entities.exceptions import MissingValueCollectionError
 
@@ -11,7 +11,9 @@ from collection_manager.entities.exceptions import MissingValueCollectionError
 @dataclass(frozen=True)
 class Collection:
     dataset_id: str
-    variable: str
+    projection: str
+    dimension_names: Dict[str, str]
+    slices: Dict[str, int]
     path: str
     historical_priority: int
     forward_processing_priority: Optional[int] = None
@@ -25,7 +27,9 @@ class Collection:
             date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None
 
             collection = Collection(dataset_id=properties['id'],
-                                    variable=properties['variable'],
+                                    projection=properties['projection'],
+                                    dimension_names=frozenset(properties['dimensionNames'].items()),
+                                    slices=frozenset(properties['slices'].items()),
                                     path=properties['path'],
                                     historical_priority=properties['priority'],
                                     forward_processing_priority=properties.get('forward-processing-priority', None),
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index ac61586..88f512b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -1,8 +1,7 @@
 import logging
 import os.path
 from typing import Dict
-
-import pystache
+import yaml
 
 from collection_manager.entities import Collection
 from collection_manager.services import MessagePublisher
@@ -79,12 +78,28 @@ class CollectionProcessor:
 
     @staticmethod
     def _fill_template(granule_path: str, collection: Collection, config_template: str) -> str:
-        renderer = pystache.Renderer()
-        config_content = renderer.render(config_template,
-                                         {
-                                             'granule': granule_path,
-                                             'dataset_id': collection.dataset_id,
-                                             'variable': collection.variable
-                                         })
-        logger.debug(f"Templated dataset config:\n{config_content}")
-        return config_content
+        config_dict = {
+            'granule': {
+                'resource': granule_path
+            },
+            'slicer': {
+                'name': 'sliceFileByStepSize',
+                'dimension_step_sizes': dict(collection.slices)
+            },
+            'processors': [
+                {
+                    'name': collection.projection,
+                    **dict(collection.dimension_names),
+                },
+                {'name': 'emptyTileFilter'},
+                {'name': 'kelvinToCelsius'},
+                {
+                    'name': 'tileSummary',
+                    'dataset_name': collection.dataset_id
+                },
+                {'name': 'generateTileId'}
+            ]
+        }
+        config_str = yaml.dump(config_dict)
+        logger.debug(f"Templated dataset config:\n{config_str}")
+        return config_str
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 54c8877..80ad885 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -50,7 +50,7 @@ class CollectionWatcher:
                                      func=self._reload_and_reschedule)
         self._observer.start()
 
-    def collections(self) -> Set[Collection]:
+    def _collections(self) -> Set[Collection]:
         """
         Return a set of all Collections being watched.
         :return: A set of Collections
@@ -96,9 +96,9 @@ class CollectionWatcher:
                                                "next file modification.")
 
     def _get_updated_collections(self) -> Set[Collection]:
-        old_collections = self.collections()
+        old_collections = self._collections()
         self._load_collections()
-        return self.collections() - old_collections
+        return self._collections() - old_collections
 
     async def _reload_and_reschedule(self):
         try:
diff --git a/collection_manager/tests/entities/test_Collection.py b/collection_manager/tests/entities/test_Collection.py
index 46506d4..2b83fcb 100644
--- a/collection_manager/tests/entities/test_Collection.py
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -12,7 +12,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -23,7 +25,9 @@ class TestCollection(unittest.TestCase):
         pattern = os.path.join(os.path.dirname(__file__), "../resources/data/*.nc")
         collection = Collection(dataset_id="test_dataset",
                                 path=pattern,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -34,7 +38,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -45,7 +51,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
@@ -57,7 +65,9 @@ class TestCollection(unittest.TestCase):
         directory = os.path.join(os.path.dirname(__file__), "../resources/data")
         collection = Collection(dataset_id="test_dataset",
                                 path=directory,
-                                variable="test_variable",
+                                projection="Grid",
+                                slices={},
+                                dimension_names={},
                                 historical_priority=1,
                                 forward_processing_priority=2,
                                 date_from=None,
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py
index c9a75c0..0dc924b 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -25,7 +25,7 @@ class TestCollectionWatcher(unittest.TestCase):
                 Collection("id4", "var4", "path4", 7, 8, datetime.now(), datetime.now()),
             }
         }
-        flattened_collections = collection_watcher.collections()
+        flattened_collections = collection_watcher._collections()
         self.assertEqual(len(flattened_collections), 4)
 
     def test_load_collections_loads_all_collections(self):
@@ -60,7 +60,7 @@ class TestCollectionWatcher(unittest.TestCase):
         collection_watcher = CollectionWatcher(collections_path, Mock(), Mock())
 
         updated_collections = collection_watcher._get_updated_collections()
-        self.assertSetEqual(updated_collections, collection_watcher.collections())
+        self.assertSetEqual(updated_collections, collection_watcher._collections())
 
     def test_get_updated_collections_returns_no_collections(self):
         collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml')
diff --git a/config_operator/tests/resources/collections.yml b/config_operator/tests/resources/collections.yml
index 42d2fbc..3414c5a 100644
--- a/config_operator/tests/resources/collections.yml
+++ b/config_operator/tests/resources/collections.yml
@@ -5,5 +5,14 @@ avhrr-oi-analysed-sst:
 
 avhrr-oi-analysed-sst2:
   path: resources/history_manager/data/avhrr_oi/*.nc
-  variable: analysed_sst
   priority: 1
+  projection: Grid
+  dimensionNames:
+    latitude: lat
+    longitude: lon
+    time: time
+    variable: analysed_sst
+  slices:
+    time: 1
+    lat: 30
+    lon: 30
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py
index 2cf2245..d1950dc 100644
--- a/granule_ingester/granule_ingester/pipeline/Modules.py
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -1,14 +1,19 @@
-from granule_ingester.processors import *
-from granule_ingester.processors.reading_processors import *
-from granule_ingester.slicers import *
-from granule_ingester.granule_loaders import *
+from granule_ingester.processors import GenerateTileId, TileSummarizingProcessor, EmptyTileFilter, KelvinToCelsius
+from granule_ingester.processors.reading_processors import (EccoReadingProcessor,
+                                                            GridReadingProcessor,
+                                                            SwathReadingProcessor,
+                                                            TimeSeriesReadingProcessor)
+from granule_ingester.slicers import SliceFileByStepSize
+from granule_ingester.granule_loaders import GranuleLoader
 
 modules = {
     "granule": GranuleLoader,
     "sliceFileByStepSize": SliceFileByStepSize,
     "generateTileId": GenerateTileId,
-    "EccoReadingProcessor": EccoReadingProcessor,
-    "GridReadingProcessor": GridReadingProcessor,
+    "ECCO": EccoReadingProcessor,
+    "Grid": GridReadingProcessor,
+    "TimeSeries": TimeSeriesReadingProcessor,
+    "Swath": SwathReadingProcessor,
     "tileSummary": TileSummarizingProcessor,
     "emptyTileFilter": EmptyTileFilter,
     "kelvinToCelsius": KelvinToCelsius
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index dabca81..86bf9c8 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -153,10 +153,13 @@ class Pipeline:
                        metadata_store_factory,
                        tile_processors,
                        max_concurrency)
+        except PipelineBuildingError:
+            raise
         except KeyError as e:
             raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
-        except Exception:
-            raise PipelineBuildingError("Cannot build pipeline.")
+        except Exception as e:
+            logger.exception(e)
+            raise PipelineBuildingError(f"Cannot build pipeline because of the following error: {e}")
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -166,7 +169,9 @@ class Pipeline:
             logger.debug("Loaded processor {}.".format(module_class))
             processor_module = module_class(**module_config)
         except KeyError:
-            raise RuntimeError("'{}' is not a valid processor.".format(module_name))
+            raise PipelineBuildingError(f"'{module_name}' is not a valid processor.")
+        except Exception as e:
+            raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}")
 
         return processor_module
 
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
index 1876013..8cc24d0 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
@@ -10,14 +10,14 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 class EccoReadingProcessor(TileReadingProcessor):
     def __init__(self,
-                 variable_to_read,
+                 variable,
                  latitude,
                  longitude,
                  tile,
                  depth=None,
                  time=None,
                  **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+        super().__init__(variable, latitude, longitude, **kwargs)
 
         self.depth = depth
         self.time = time
@@ -31,8 +31,8 @@ class EccoReadingProcessor(TileReadingProcessor):
         lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
         lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
 
-        data_subset = ds[self.variable_to_read][
-            type(self)._slices_for_variable(ds[self.variable_to_read], dimensions_to_slices)]
+        data_subset = ds[self.variable][
+            type(self)._slices_for_variable(ds[self.variable], dimensions_to_slices)]
         data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
 
         new_tile.tile = ds[self.tile][dimensions_to_slices[self.tile].start].item()
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 4354f9e..1ba76a2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 
 class GridReadingProcessor(TileReadingProcessor):
-    def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+    def __init__(self, variable, latitude, longitude, depth=None, time=None, **kwargs):
+        super().__init__(variable, latitude, longitude, **kwargs)
         self.depth = depth
         self.time = time
 
@@ -22,8 +22,8 @@ class GridReadingProcessor(TileReadingProcessor):
         lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
         lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
 
-        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
-                                                                                dimensions_to_slices)]
+        data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+                                                                        dimensions_to_slices)]
         data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
 
         if self.depth:
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index fec28ca..5b8072a 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 
 class SwathReadingProcessor(TileReadingProcessor):
-    def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+    def __init__(self, variable, latitude, longitude, time, depth=None, **kwargs):
+        super().__init__(variable, latitude, longitude, **kwargs)
         self.depth = depth
         self.time = time
 
@@ -25,7 +25,7 @@ class SwathReadingProcessor(TileReadingProcessor):
         time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)]
         time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
 
-        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+        data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
                                                                                 dimensions_to_slices)]
         data_subset = np.ma.filled(data_subset, np.NaN)
 
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 8b69ad2..a1dfd8a 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -27,8 +27,8 @@ from granule_ingester.processors.TileProcessor import TileProcessor
 
 class TileReadingProcessor(TileProcessor, ABC):
 
-    def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, **kwargs):
-        self.variable_to_read = variable_to_read
+    def __init__(self, variable: str, latitude: str, longitude: str, *args, **kwargs):
+        self.variable = variable
         self.latitude = latitude
         self.longitude = longitude
 
@@ -38,7 +38,7 @@ class TileReadingProcessor(TileProcessor, ABC):
 
             output_tile = nexusproto.NexusTile()
             output_tile.CopyFrom(tile)
-            output_tile.summary.data_var_name = self.variable_to_read
+            output_tile.summary.data_var_name = self.variable
 
             return self._generate_tile(dataset, dimensions_to_slices, output_tile)
         except Exception:
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
index 2831c0c..c4aae25 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import
 
 
 class TimeSeriesReadingProcessor(TileReadingProcessor):
-    def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs):
-        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+    def __init__(self, variable, latitude, longitude, time, depth=None, **kwargs):
+        super().__init__(variable, latitude, longitude, **kwargs)
 
         self.depth = depth
         self.time = time
@@ -23,8 +23,8 @@ class TimeSeriesReadingProcessor(TileReadingProcessor):
         lat_subset = np.ma.filled(lat_subset, np.NaN)
         lon_subset = np.ma.filled(lon_subset, np.NaN)
 
-        data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
-                                                                                dimensions_to_slices)]
+        data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+                                                                        dimensions_to_slices)]
         data_subset = np.ma.filled(data_subset, np.NaN)
 
         if self.depth:
diff --git a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
index 9af889d..63df51a 100644
--- a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
+++ b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml
@@ -7,11 +7,11 @@ slicer:
     lat: 33
     lon: 26
 processors:
-  - name: EccoReadingProcessor
+  - name: ECCO
     latitude: YC
     longitude: XC
     time: time
     depth: Z
     tile: tile
-    variable_to_read: THETA
+    variable: THETA
   - name: generateTileId
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index 34e66c6..27ec72b 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -6,8 +6,9 @@ from nexusproto import DataTile_pb2 as nexusproto
 from granule_ingester.pipeline.Pipeline import Pipeline
 from granule_ingester.processors import GenerateTileId
 from granule_ingester.processors.reading_processors import EccoReadingProcessor
-from granule_ingester.slicers.SliceFileByStepSize import *
+from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize
 from granule_ingester.writers import DataStore, MetadataStore
+from granule_ingester.exceptions import PipelineBuildingError
 
 
 class TestPipeline(unittest.TestCase):
@@ -70,7 +71,7 @@ class TestPipeline(unittest.TestCase):
             "name": "MockModule",
             "bogus_param": True
         }
-        self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+        self.assertRaises(PipelineBuildingError, Pipeline._parse_module, module_config, module_mappings)
 
     def test_parse_module_with_missing_parameters(self):
         module_mappings = {"MockModule": TestPipeline.MockProcessorWithParams}
@@ -78,7 +79,7 @@ class TestPipeline(unittest.TestCase):
             "name": "MockModule"
         }
 
-        self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings)
+        self.assertRaises(PipelineBuildingError, Pipeline._parse_module, module_config, module_mappings)
 
     def test_process_tile(self):
         # class MockIdProcessor:
diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
index f2e9f29..03d5054 100644
--- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
@@ -10,7 +10,7 @@ from granule_ingester.processors.reading_processors import EccoReadingProcessor
 class TestEccoReadingProcessor(unittest.TestCase):
 
     def test_generate_tile(self):
-        reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+        reading_processor = EccoReadingProcessor(variable='OBP',
                                                  latitude='YC',
                                                  longitude='XC',
                                                  time='time',
@@ -40,7 +40,7 @@ class TestEccoReadingProcessor(unittest.TestCase):
             self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7])
 
     def test_generate_tile_with_dims_out_of_order(self):
-        reading_processor = EccoReadingProcessor(variable_to_read='OBP',
+        reading_processor = EccoReadingProcessor(variable='OBP',
                                                  latitude='YC',
                                                  longitude='XC',
                                                  time='time',
diff --git a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
index aec3ae8..31cb547 100644
--- a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
@@ -209,7 +209,7 @@ class TestReadInterpEccoData(unittest.TestCase):
                                            time='time')
 
     def test_read_indexed_ecco(self):
-        reading_processor = GridReadingProcessor(variable_to_read='OBP',
+        reading_processor = GridReadingProcessor(variable='OBP',
                                                  latitude='latitude',
                                                  longitude='longitude',
                                                  time='time')
diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
index 55ac4fc..db623f5 100644
--- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
@@ -24,7 +24,7 @@ from granule_ingester.processors.reading_processors import SwathReadingProcessor
 
 class TestReadAscatbData(unittest.TestCase):
     def test_read_not_empty_ascatb(self):
-        reading_processor = SwathReadingProcessor(variable_to_read='wind_speed',
+        reading_processor = SwathReadingProcessor(variable='wind_speed',
                                                   latitude='lat',
                                                   longitude='lon',
                                                   time='time')
@@ -50,7 +50,7 @@ class TestReadAscatbData(unittest.TestCase):
 class TestReadSmapData(unittest.TestCase):
     def test_read_not_empty_smap(self):
         reading_processor = SwathReadingProcessor(
-            variable_to_read='smap_sss',
+            variable='smap_sss',
             latitude='lat',
             longitude='lon',
             time='row_time')