You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/12/09 19:47:24 UTC

[incubator-sdap-ingester] 01/02: remove autocommit in solr connection, commit after granule is fully processed

This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch less_solr_commit
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 4aa0ff6dc85a9ffa80a43fe28da52ac7a01be7b6
Author: Thomas Loubrieu <lo...@jpl.nasa.gov>
AuthorDate: Wed Dec 8 19:41:26 2021 -0500

    remove autocommit in solr connection, commit after granule is fully processed
---
 granule_ingester/README.md                         | 10 +++++-
 granule_ingester/conda-requirements.txt            | 10 ------
 granule_ingester/granule_ingester/main.py          |  2 +-
 .../granule_ingester/pipeline/Pipeline.py          |  7 ++--
 .../processors/ForceAscendingLatitude.py           |  1 -
 .../GridMultiVariableReadingProcessor.py           | 40 +++++++++++++++++++++-
 .../granule_ingester/writers/MetadataStore.py      |  4 +++
 .../granule_ingester/writers/SolrStore.py          | 11 ++++--
 granule_ingester/requirements.txt                  | 27 ++++++++++-----
 granule_ingester/requirements2.txt                 |  9 +++++
 granule_ingester/setup.py                          |  8 ++---
 .../test_TileSummarizingProcessor.py               |  6 ++--
 .../tests/slicers/test_SliceFileByStepSize.py      |  2 +-
 13 files changed, 102 insertions(+), 35 deletions(-)

diff --git a/granule_ingester/README.md b/granule_ingester/README.md
index 1339835..34d292b 100644
--- a/granule_ingester/README.md
+++ b/granule_ingester/README.md
@@ -9,7 +9,15 @@ data to Cassandra and Solr.
 
 ## Prerequisites
 
-Python 3.7
+Python 3.7, conda
+
+Create a virtual environment:
+
+    conda create -n nexus-ingester python=3.7
+
+Activate it:
+
+    conda activate nexus-ingester
 
 ## Building the service
 From `incubator-sdap-ingester`, run:
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index 810e278..e69de29 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -1,10 +0,0 @@
-numpy==1.15.4
-scipy
-netcdf4==1.5.3
-pandas==1.0.4
-pytz==2019.3
-xarray
-pyyaml==5.3.1
-requests==2.23.0
-aiohttp==3.6.2
-tenacity
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 84ab004..029df1c 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -225,7 +225,7 @@ async def main(loop):
 
             async with consumer:
                 logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
-                await consumer.start_consuming(args.max_threads)
+                await consumer.start_consuming(int(args.max_threads))
         except FailedHealthCheckError as e:
             logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
         except LostConnectionError as e:
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index abc07a0..9781a0f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -101,7 +101,7 @@ class Pipeline:
         self._slicer = slicer
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
-        self._max_concurrency = max_concurrency
+        self._max_concurrency: int = max_concurrency
 
         # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
@@ -189,8 +189,8 @@ class Pipeline:
                                       self._data_store_factory,
                                       self._metadata_store_factory,
                                       shared_memory),
-                            maxtasksperchild=self._max_concurrency,
-                            childconcurrency=self._max_concurrency) as pool:
+                            maxtasksperchild=int(self._max_concurrency),
+                            childconcurrency=int(self._max_concurrency)) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
@@ -204,6 +204,7 @@ class Pipeline:
                         # await asyncio.sleep(1)
                         raise pickle.loads(shared_memory.error)
 
+        self._metadata_store_factory.commit()
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
diff --git a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
index 9dc3407..dac9646 100644
--- a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
+++ b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
@@ -40,7 +40,6 @@ class ForceAscendingLatitude(TileProcessor):
     def process(self, tile, *args, **kwargs):
         """
         This method will reverse the ordering of latitude values in a tile if necessary to ensure that the latitude values are ascending.
-​
         :param self:
         :param tile: The nexus_tile
         :return: Tile data with altered latitude values
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
index c36b8d2..cde440d 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
@@ -1,5 +1,7 @@
 import logging
+import time
 from typing import Dict
+from itertools import takewhile
 
 import cftime
 import numpy as np
@@ -48,6 +50,16 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
         logger.debug(f'reading as banded grid as self.variable is a list. self.variable: {self.variable}')
         if len(self.variable) < 1:
             raise ValueError(f'list of variable is empty. Need at least 1 variable')
+
+        start = time.time()
+
+
+        data_subset_variables, data_subset_variables_dims = self._get_variable_values(ds, dimensions_to_slices)
+        input_tile.summary.data_dim_names.extend(data_subset_variables_dims)
+        data_subset_variables = np.ma.filled(data_subset_variables, np.NaN)
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset_variables))
+
+        '''
         data_subset = [ds[k][type(self)._slices_for_variable(ds[k], dimensions_to_slices)] for k in self.variable]
         updated_dims, updated_dims_indices = MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
         logger.debug(f'filling the data_subset with NaN')
@@ -56,6 +68,12 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
         data_subset = data_subset.transpose(updated_dims_indices)
         logger.debug(f'adding summary.data_dim_names')
         input_tile.summary.data_dim_names.extend(updated_dims)
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+        '''
+
+        end = time.time()
+        logger.debug("processing time %f", end-start)
+
         if self.depth:
             depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
                                                                           dimensions_to_slices).items())[0]
@@ -80,7 +98,27 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
 
         new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
         new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
-        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
 
         input_tile.tile.grid_multi_variable_tile.CopyFrom(new_tile)
         return input_tile
+
+    def _get_variable_values(self, ds, dimensions_to_slices):
+        variable_iterator = iter(self.variable)
+        v = next(variable_iterator)
+        v_data_subset = ds[v][type(self)._slices_for_variable(ds[v], dimensions_to_slices)]
+        new_shape = list(v_data_subset.shape)
+        new_shape.append(1)
+        data_subset_variables = np.reshape(v_data_subset.values, new_shape)
+        try:
+            while True:
+                v = next(variable_iterator)
+                v_data_subset = ds[v][type(self)._slices_for_variable(ds[v], dimensions_to_slices)]
+                v_data_subset_values = np.reshape(v_data_subset.values, new_shape)
+                data_subset_variables = np.concatenate([data_subset_variables, v_data_subset_values], axis=3)
+        except StopIteration:
+            pass
+
+        dims = list(v_data_subset.dims)
+        dims.extend(MultiBandUtils.BAND)
+
+        return data_subset_variables, dims
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 26311af..6fca80f 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -9,3 +9,7 @@ class MetadataStore(HealthCheck, ABC):
     @abstractmethod
     def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
         pass
+
+    @abstractmethod
+    def commit(self) -> None:
+        pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 5c5f088..02a568d 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -86,7 +86,7 @@ class SolrStore(MetadataStore):
         if self._zk_url:
             zk = pysolr.ZooKeeper(f"{self._zk_url}")
             self._set_solr_status(zk)
-            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+            return pysolr.SolrCloud(zk, self._collection)
         elif self._solr_url:
             return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
         else:
@@ -116,7 +116,14 @@ class SolrStore(MetadataStore):
         try:
             self._solr.add([doc])
         except pysolr.SolrError as e:
-            logger.exception(f'Lost connection to Solr, and cannot save tiles. cause: {e}. creating SolrLostConnectionError')
+            logger.exception(f'Lost connection to Solr, and cannot save tiles while adding doc. cause: {e}. creating SolrLostConnectionError')
+            raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}')
+
+    def _commit(self):
+        try:
+            self._solr.commit()
+        except pysolr.SolrError as e:
+            logger.exception(f'Lost connection to Solr, and cannot save tiles while commiting. cause: {e}. creating SolrLostConnectionError')
             raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}')
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 92f31f3..03013af 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,8 +1,19 @@
-cassandra-driver==3.23.0
-aiomultiprocess==0.7.0
-aioboto3==8.0.5
-tblib==1.6.0
-pysolr==3.9.0
-kazoo==2.8.0
-aio-pika==6.7.1
-elasticsearch[async]
+cassandra-driver==3.23
+aiomultiprocess==0.7
+aioboto3==8.0
+tblib==1.6
+pysolr==3.9
+kazoo==2.8
+aio-pika==6.7
+requests==2.26
+tenacity==8.0
+elasticsearch[async]==7.15
+netcdf4==1.5
+numpy==1.21
+scipy==1.7
+pandas==1.3
+pytz==2019.3
+xarray==0.20
+pyyaml==5.3
+urllib3==1.24
+aiohttp==3.6
diff --git a/granule_ingester/requirements2.txt b/granule_ingester/requirements2.txt
new file mode 100644
index 0000000..a9f6e18
--- /dev/null
+++ b/granule_ingester/requirements2.txt
@@ -0,0 +1,9 @@
+cassandra-driver==3.23
+aiomultiprocess==0.7
+aioboto3==8.0
+tblib==1.6
+pysolr==3.9
+kazoo==2.8
+aio-pika==6.7
+requests==2.26
+elasticsearch[async]==7.15
diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py
index 2a5920e..0650490 100644
--- a/granule_ingester/setup.py
+++ b/granule_ingester/setup.py
@@ -5,10 +5,10 @@ from setuptools import setup, find_packages
 with open('requirements.txt') as f:
     pip_requirements = f.readlines()
 
-try:
-    check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
-except (CalledProcessError, IOError) as e:
-    raise EnvironmentError("Error installing conda packages", e)
+#try:
+#    check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+#except (CalledProcessError, IOError) as e:
+#    raise EnvironmentError("Error installing conda packages", e)
 
 __version__ = '1.0.0-SNAPSHOT'
 
diff --git a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
index 3f78114..75ec707 100644
--- a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
@@ -43,7 +43,7 @@ class TestTileSummarizingProcessor(unittest.TestCase):
             output_tile = reading_processor._generate_tile(ds, dims, input_tile)
             tile_summary_processor = TileSummarizingProcessor('test')
             new_tile = tile_summary_processor.process(tile=output_tile, dataset=ds)
-            self.assertEqual('"sea_surface_temperature"', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
+            self.assertEqual('sea_surface_temperature', eval(new_tile.summary.standard_name)[0], f'wrong new_tile.summary.standard_name')
 
     def test_hls_single_var01(self):
         """
@@ -74,8 +74,8 @@ class TestTileSummarizingProcessor(unittest.TestCase):
             output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
             tile_summary_processor = TileSummarizingProcessor('test')
             new_tile = tile_summary_processor.process(tile=output_tile, dataset=ds)
-            self.assertEqual('null', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
-            self.assertEqual(None, json.loads(new_tile.summary.standard_name), f'unable to convert new_tile.summary.standard_name from JSON')
+            self.assertEqual('[null]', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
+            self.assertEqual(None, json.loads(new_tile.summary.standard_name)[0], f'unable to convert new_tile.summary.standard_name from JSON')
             self.assertTrue(abs(new_tile.summary.stats.mean - 0.26137) < 0.001, f'mean value is not close expected: 0.26137. actual: {new_tile.summary.stats.mean}')
 
     def test_hls_multiple_var_01(self):
diff --git a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
index 7a8dd51..d9094eb 100644
--- a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
+++ b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
@@ -15,7 +15,7 @@ class TestSliceFileByStepSize(unittest.TestCase):
             slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
             slices = slicer._generate_slices(dimension_specs=dataset.dims)
             expected_slices = [
-                'depth:0:2,latitude:0:180,longitude:0:180,nv:0:2,time:0:1',
+                'time:0:1,nv:0:2,depth:0:2,longitude:0:180,latitude:0:180',
                 'depth:0:2,latitude:0:180,longitude:180:360,nv:0:2,time:0:1',
                 'depth:0:2,latitude:0:180,longitude:360:540,nv:0:2,time:0:1',
                 'depth:0:2,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',