You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/12/09 19:47:24 UTC
[incubator-sdap-ingester] 01/02: remove autocommit in solr connection, commit after granule is fully processed
This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch less_solr_commit
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 4aa0ff6dc85a9ffa80a43fe28da52ac7a01be7b6
Author: Thomas Loubrieu <lo...@jpl.nasa.gov>
AuthorDate: Wed Dec 8 19:41:26 2021 -0500
remove autocommit in solr connection, commit after granule is fully processed
---
granule_ingester/README.md | 10 +++++-
granule_ingester/conda-requirements.txt | 10 ------
granule_ingester/granule_ingester/main.py | 2 +-
.../granule_ingester/pipeline/Pipeline.py | 7 ++--
.../processors/ForceAscendingLatitude.py | 1 -
.../GridMultiVariableReadingProcessor.py | 40 +++++++++++++++++++++-
.../granule_ingester/writers/MetadataStore.py | 4 +++
.../granule_ingester/writers/SolrStore.py | 11 ++++--
granule_ingester/requirements.txt | 27 ++++++++++-----
granule_ingester/requirements2.txt | 9 +++++
granule_ingester/setup.py | 8 ++---
.../test_TileSummarizingProcessor.py | 6 ++--
.../tests/slicers/test_SliceFileByStepSize.py | 2 +-
13 files changed, 102 insertions(+), 35 deletions(-)
diff --git a/granule_ingester/README.md b/granule_ingester/README.md
index 1339835..34d292b 100644
--- a/granule_ingester/README.md
+++ b/granule_ingester/README.md
@@ -9,7 +9,15 @@ data to Cassandra and Solr.
## Prerequisites
-Python 3.7
+Python 3.7, conda
+
+Create a virtual environment:
+
+ conda create -n nexus-ingester python=3.7
+
+Activate it:
+
+ conda activate nexus-ingester
## Building the service
From `incubator-sdap-ingester`, run:
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index 810e278..e69de29 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -1,10 +0,0 @@
-numpy==1.15.4
-scipy
-netcdf4==1.5.3
-pandas==1.0.4
-pytz==2019.3
-xarray
-pyyaml==5.3.1
-requests==2.23.0
-aiohttp==3.6.2
-tenacity
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 84ab004..029df1c 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -225,7 +225,7 @@ async def main(loop):
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
- await consumer.start_consuming(args.max_threads)
+ await consumer.start_consuming(int(args.max_threads))
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
except LostConnectionError as e:
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index abc07a0..9781a0f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -101,7 +101,7 @@ class Pipeline:
self._slicer = slicer
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
- self._max_concurrency = max_concurrency
+ self._max_concurrency: int = max_concurrency
# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
@@ -189,8 +189,8 @@ class Pipeline:
self._data_store_factory,
self._metadata_store_factory,
shared_memory),
- maxtasksperchild=self._max_concurrency,
- childconcurrency=self._max_concurrency) as pool:
+ maxtasksperchild=int(self._max_concurrency),
+ childconcurrency=int(self._max_concurrency)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
@@ -204,6 +204,7 @@ class Pipeline:
# await asyncio.sleep(1)
raise pickle.loads(shared_memory.error)
+ self._metadata_store_factory.commit()
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
index 9dc3407..dac9646 100644
--- a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
+++ b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
@@ -40,7 +40,6 @@ class ForceAscendingLatitude(TileProcessor):
def process(self, tile, *args, **kwargs):
"""
This method will reverse the ordering of latitude values in a tile if necessary to ensure that the latitude values are ascending.
-
:param self:
:param tile: The nexus_tile
:return: Tile data with altered latitude values
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
index c36b8d2..cde440d 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
@@ -1,5 +1,7 @@
import logging
+import time
from typing import Dict
+from itertools import takewhile
import cftime
import numpy as np
@@ -48,6 +50,16 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
logger.debug(f'reading as banded grid as self.variable is a list. self.variable: {self.variable}')
if len(self.variable) < 1:
raise ValueError(f'list of variable is empty. Need at least 1 variable')
+
+ start = time.time()
+
+
+ data_subset_variables, data_subset_variables_dims = self._get_variable_values(ds, dimensions_to_slices)
+ input_tile.summary.data_dim_names.extend(data_subset_variables_dims)
+ data_subset_variables = np.ma.filled(data_subset_variables, np.NaN)
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset_variables))
+
+ '''
data_subset = [ds[k][type(self)._slices_for_variable(ds[k], dimensions_to_slices)] for k in self.variable]
updated_dims, updated_dims_indices = MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
logger.debug(f'filling the data_subset with NaN')
@@ -56,6 +68,12 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
data_subset = data_subset.transpose(updated_dims_indices)
logger.debug(f'adding summary.data_dim_names')
input_tile.summary.data_dim_names.extend(updated_dims)
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+ '''
+
+ end = time.time()
+ logger.debug("processing time %f", end-start)
+
if self.depth:
depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth],
dimensions_to_slices).items())[0]
@@ -80,7 +98,27 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
- new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
input_tile.tile.grid_multi_variable_tile.CopyFrom(new_tile)
return input_tile
+
+ def _get_variable_values(self, ds, dimensions_to_slices):
+ variable_iterator = iter(self.variable)
+ v = next(variable_iterator)
+ v_data_subset = ds[v][type(self)._slices_for_variable(ds[v], dimensions_to_slices)]
+ new_shape = list(v_data_subset.shape)
+ new_shape.append(1)
+ data_subset_variables = np.reshape(v_data_subset.values, new_shape)
+ try:
+ while True:
+ v = next(variable_iterator)
+ v_data_subset = ds[v][type(self)._slices_for_variable(ds[v], dimensions_to_slices)]
+ v_data_subset_values = np.reshape(v_data_subset.values, new_shape)
+ data_subset_variables = np.concatenate([data_subset_variables, v_data_subset_values], axis=3)
+ except StopIteration:
+ pass
+
+ dims = list(v_data_subset.dims)
+ dims.extend(MultiBandUtils.BAND)
+
+ return data_subset_variables, dims
\ No newline at end of file
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 26311af..6fca80f 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -9,3 +9,7 @@ class MetadataStore(HealthCheck, ABC):
@abstractmethod
def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
pass
+
+ @abstractmethod
+ def commit(self) -> None:
+ pass
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 5c5f088..02a568d 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -86,7 +86,7 @@ class SolrStore(MetadataStore):
if self._zk_url:
zk = pysolr.ZooKeeper(f"{self._zk_url}")
self._set_solr_status(zk)
- return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+ return pysolr.SolrCloud(zk, self._collection)
elif self._solr_url:
return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
else:
@@ -116,7 +116,14 @@ class SolrStore(MetadataStore):
try:
self._solr.add([doc])
except pysolr.SolrError as e:
- logger.exception(f'Lost connection to Solr, and cannot save tiles. cause: {e}. creating SolrLostConnectionError')
+ logger.exception(f'Lost connection to Solr, and cannot save tiles while adding doc. cause: {e}. creating SolrLostConnectionError')
+ raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}')
+
+ def _commit(self):
+ try:
+ self._solr.commit()
+ except pysolr.SolrError as e:
+ logger.exception(f'Lost connection to Solr, and cannot save tiles while commiting. cause: {e}. creating SolrLostConnectionError')
raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}')
def _build_solr_doc(self, tile: NexusTile) -> Dict:
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index 92f31f3..03013af 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,8 +1,19 @@
-cassandra-driver==3.23.0
-aiomultiprocess==0.7.0
-aioboto3==8.0.5
-tblib==1.6.0
-pysolr==3.9.0
-kazoo==2.8.0
-aio-pika==6.7.1
-elasticsearch[async]
+cassandra-driver==3.23
+aiomultiprocess==0.7
+aioboto3==8.0
+tblib==1.6
+pysolr==3.9
+kazoo==2.8
+aio-pika==6.7
+requests==2.26
+tenacity==8.0
+elasticsearch[async]==7.15
+netcdf4==1.5
+numpy==1.21
+scipy==1.7
+pandas==1.3
+pytz==2019.3
+xarray==0.20
+pyyaml==5.3
+urllib3==1.24
+aiohttp==3.6
diff --git a/granule_ingester/requirements2.txt b/granule_ingester/requirements2.txt
new file mode 100644
index 0000000..a9f6e18
--- /dev/null
+++ b/granule_ingester/requirements2.txt
@@ -0,0 +1,9 @@
+cassandra-driver==3.23
+aiomultiprocess==0.7
+aioboto3==8.0
+tblib==1.6
+pysolr==3.9
+kazoo==2.8
+aio-pika==6.7
+requests==2.26
+elasticsearch[async]==7.15
diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py
index 2a5920e..0650490 100644
--- a/granule_ingester/setup.py
+++ b/granule_ingester/setup.py
@@ -5,10 +5,10 @@ from setuptools import setup, find_packages
with open('requirements.txt') as f:
pip_requirements = f.readlines()
-try:
- check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
-except (CalledProcessError, IOError) as e:
- raise EnvironmentError("Error installing conda packages", e)
+#try:
+# check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+#except (CalledProcessError, IOError) as e:
+# raise EnvironmentError("Error installing conda packages", e)
__version__ = '1.0.0-SNAPSHOT'
diff --git a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
index 3f78114..75ec707 100644
--- a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
@@ -43,7 +43,7 @@ class TestTileSummarizingProcessor(unittest.TestCase):
output_tile = reading_processor._generate_tile(ds, dims, input_tile)
tile_summary_processor = TileSummarizingProcessor('test')
new_tile = tile_summary_processor.process(tile=output_tile, dataset=ds)
- self.assertEqual('"sea_surface_temperature"', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
+ self.assertEqual('sea_surface_temperature', eval(new_tile.summary.standard_name)[0], f'wrong new_tile.summary.standard_name')
def test_hls_single_var01(self):
"""
@@ -74,8 +74,8 @@ class TestTileSummarizingProcessor(unittest.TestCase):
output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile)
tile_summary_processor = TileSummarizingProcessor('test')
new_tile = tile_summary_processor.process(tile=output_tile, dataset=ds)
- self.assertEqual('null', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
- self.assertEqual(None, json.loads(new_tile.summary.standard_name), f'unable to convert new_tile.summary.standard_name from JSON')
+ self.assertEqual('[null]', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
+ self.assertEqual(None, json.loads(new_tile.summary.standard_name)[0], f'unable to convert new_tile.summary.standard_name from JSON')
self.assertTrue(abs(new_tile.summary.stats.mean - 0.26137) < 0.001, f'mean value is not close expected: 0.26137. actual: {new_tile.summary.stats.mean}')
def test_hls_multiple_var_01(self):
diff --git a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
index 7a8dd51..d9094eb 100644
--- a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
+++ b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py
@@ -15,7 +15,7 @@ class TestSliceFileByStepSize(unittest.TestCase):
slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps)
slices = slicer._generate_slices(dimension_specs=dataset.dims)
expected_slices = [
- 'depth:0:2,latitude:0:180,longitude:0:180,nv:0:2,time:0:1',
+ 'time:0:1,nv:0:2,depth:0:2,longitude:0:180,latitude:0:180',
'depth:0:2,latitude:0:180,longitude:180:360,nv:0:2,time:0:1',
'depth:0:2,latitude:0:180,longitude:360:540,nv:0:2,time:0:1',
'depth:0:2,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',