You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/05 20:23:25 UTC
[incubator-sdap-ingester] branch rabbitmq-fix updated (c36ffaf ->
b88d462)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a change to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.
discard c36ffaf undo
discard 0eed399 better error handling
add 68110d0 SDAP-273: Configure max threads in Granule Ingester (#13)
add d94c89f SDAP-266: add README note on synchronization of configmap with local path (#14)
add fe6a1c5 SDAP-272: Support connecting to Solr through Zk in Granule Ingester (#12)
new c54af96 better error handling
new b88d462 undo
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (c36ffaf)
\
N -- N -- N refs/heads/rabbitmq-fix (b88d462)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
config_operator/README.md | 14 ++++++++--
config_operator/config_operator/main.py | 17 +++++++-----
granule_ingester/docker/entrypoint.sh | 21 +++++++-------
.../granule_ingester/consumer/Consumer.py | 15 ++++++----
granule_ingester/granule_ingester/main.py | 24 +++++++++-------
.../granule_ingester/pipeline/Pipeline.py | 32 +++++++++++++---------
.../granule_ingester/writers/SolrStore.py | 2 +-
7 files changed, 77 insertions(+), 48 deletions(-)
[incubator-sdap-ingester] 02/02: undo
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit b88d4627e5942aca3b6f15f135529e6a826306e5
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 16:15:35 2020 -0700
undo
---
granule_ingester/granule_ingester/main.py | 2 --
1 file changed, 2 deletions(-)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index ecc9d40..15390fd 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -113,8 +113,6 @@ async def main(loop):
cassandra_password = args.cassandra_password
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
- cassandra_username = args.cassandra_username
- cassandra_password = args.cassandra_password
solr_host_and_port = args.solr_host_and_port
zk_host_and_port = args.zk_host_and_port
[incubator-sdap-ingester] 01/02: better error handling
Posted by ea...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit c54af9691c2653599540b23cf79109c99b995d72
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 8 20:16:29 2020 -0500
better error handling
better exception handling
error handling
the healthchecks now raise exceptions if they rail
propagate child worker exceptions up to main process
exc handling
error handling
use pysolr
solr history bug fixes
use asyncio in the collection ingester
---
granule_ingester/conda-requirements.txt | 2 +-
.../granule_ingester/consumer/Consumer.py | 40 +++++--
granule_ingester/granule_ingester/main.py | 2 +
.../granule_ingester/pipeline/Pipeline.py | 133 +++++++++++++--------
.../reading_processors/TileReadingProcessor.py | 20 ++--
.../granule_ingester/writers/DataStore.py | 1 +
granule_ingester/tests/pipeline/test_Pipeline.py | 9 +-
7 files changed, 134 insertions(+), 73 deletions(-)
diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
pyyaml==5.3.1
requests==2.23.0
aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
tenacity
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 5df51fe..6c72837 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,6 +17,8 @@ import logging
import aio_pika
+from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
+ RabbitMQFailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.pipeline import Pipeline
@@ -39,7 +41,7 @@ class Consumer(HealthCheck):
self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
password=rabbitmq_password,
host=rabbitmq_host)
- self._connection = None
+ self._connection: aio_pika.Connection = None
async def health_check(self) -> bool:
try:
@@ -47,10 +49,10 @@ class Consumer(HealthCheck):
await connection.close()
return True
except Exception:
- logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
- return False
+ raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! "
+ f"Connection string was {self._connection_string}")
- async def _get_connection(self):
+ async def _get_connection(self) -> aio_pika.Connection:
return await aio_pika.connect_robust(self._connection_string)
async def __aenter__(self):
@@ -75,19 +77,37 @@ class Consumer(HealthCheck):
metadata_store_factory=metadata_store_factory,
max_concurrency=pipeline_max_concurrency)
await pipeline.run()
- message.ack()
+ await message.ack()
+ except PipelineBuildingError as e:
+ await message.reject()
+ logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped "
+ f"from RabbitMQ. The exception was:\n{e}")
+ except PipelineRunningError as e:
+ await message.reject()
+ logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}")
+ except LostConnectionError:
+ # Let main() handle this
+ raise
except Exception as e:
- message.reject(requeue=True)
- logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
+ await message.reject(requeue=True)
+ logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}")
async def start_consuming(self, pipeline_max_concurrency=16):
channel = await self._connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
+ queue_iter = queue.iterator()
+ async for message in queue_iter:
+ try:
await self._received_message(message,
self._data_store_factory,
self._metadata_store_factory,
pipeline_max_concurrency)
+ except aio_pika.exceptions.MessageProcessError:
+ # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
+ # connection has died, and attempting to close the queue will only raise another exception.
+ raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.")
+ except Exception as e:
+ await queue_iter.close()
+ await channel.close()
+ raise e
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 15390fd..ecc9d40 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -113,6 +113,8 @@ async def main(loop):
cassandra_password = args.cassandra_password
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
+ cassandra_username = args.cassandra_username
+ cassandra_password = args.cassandra_password
solr_host_and_port = args.solr_host_and_port
zk_host_and_port = args.zk_host_and_port
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e1e53bf..3ff6822 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,38 +13,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
-import os
+import pickle
import time
+from multiprocessing import Manager
from typing import List
import xarray as xr
import yaml
-import aiomultiprocess
+from aiomultiprocess import Pool
+from aiomultiprocess.types import ProxyException
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
-from granule_ingester.pipeline.Modules import modules as processor_module_mappings
+from granule_ingester.pipeline.Modules import \
+ modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
from granule_ingester.slicers import TileSlicer
from granule_ingester.writers import DataStore, MetadataStore
from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
logger = logging.getLogger(__name__)
-MAX_QUEUE_SIZE = 2 ** 15 - 1
+# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain
+# number of items to process. The exact number is unknown, but 2**8-1 is safe.
+MAX_CHUNK_SIZE = 2 ** 8 - 1
_worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
_worker_processor_list: List[TileProcessor] = None
_worker_dataset = None
+_shared_memory = None
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
global _worker_data_store
global _worker_metadata_store
global _worker_processor_list
global _worker_dataset
+ global _shared_memory
# _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
# however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -52,19 +60,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
_worker_metadata_store = metadata_store_factory()
_worker_processor_list = processor_list
_worker_dataset = dataset
+ _shared_memory = shared_memory
async def _process_tile_in_worker(serialized_input_tile: str):
- global _worker_data_store
- global _worker_metadata_store
- global _worker_processor_list
- global _worker_dataset
+ try:
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
- processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- if processed_tile:
- await _worker_data_store.save_data(processed_tile)
- await _worker_metadata_store.save_metadata(processed_tile)
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+ except Exception as e:
+ pickling_support.install(e)
+ _shared_memory.error = pickle.dumps(e)
+ raise
def _recurse(processor_list: List[TileProcessor],
@@ -91,25 +101,33 @@ class Pipeline:
self._metadata_store_factory = metadata_store_factory
self._max_concurrency = max_concurrency
- @classmethod
- def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
- config = yaml.load(config_str, yaml.FullLoader)
- return cls._build_pipeline(config,
- data_store_factory,
- metadata_store_factory,
- processor_module_mappings,
- max_concurrency)
+ # Create a SyncManager so that we can to communicate exceptions from the
+ # worker processes back to the main process.
+ self._manager = Manager()
+
+ def __del__(self):
+ self._manager.shutdown()
@classmethod
- def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
- with open(config_path) as config_file:
- config = yaml.load(config_file, yaml.FullLoader)
+ def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
+ try:
+ config = yaml.load(config_str, yaml.FullLoader)
+ cls._validate_config(config)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
processor_module_mappings,
max_concurrency)
+ except yaml.scanner.ScannerError:
+ raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+
+ # TODO: this method should validate the config against an actual schema definition
+ @staticmethod
+ def _validate_config(config: dict):
+ if type(config) is not dict:
+ raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
@@ -117,17 +135,27 @@ class Pipeline:
metadata_store_factory,
module_mappings: dict,
max_concurrency: int):
- granule_loader = GranuleLoader(**config['granule'])
-
- slicer_config = config['slicer']
- slicer = cls._parse_module(slicer_config, module_mappings)
-
- tile_processors = []
- for processor_config in config['processors']:
- module = cls._parse_module(processor_config, module_mappings)
- tile_processors.append(module)
-
- return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
+ try:
+ granule_loader = GranuleLoader(**config['granule'])
+
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
+
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
+
+ return cls(granule_loader,
+ slicer,
+ data_store_factory,
+ metadata_store_factory,
+ tile_processors,
+ max_concurrency)
+ except KeyError as e:
+ raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.")
+ except Exception:
+ raise PipelineBuildingError("Cannot build pipeline.")
@classmethod
def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -142,25 +170,36 @@ class Pipeline:
return processor_module
async def run(self):
+
+ logger.info(f"Running pipeline with {self._max_concurrency} threads per process")
async with self._granule_loader as (dataset, granule_name):
start = time.perf_counter()
- async with aiomultiprocess.Pool(initializer=_init_worker,
- initargs=(self._tile_processors,
- dataset,
- self._data_store_factory,
- self._metadata_store_factory),
- maxtasksperchild=self._max_concurrency,
- childconcurrency=self._max_concurrency) as pool:
+
+ shared_memory = self._manager.Namespace()
+ async with Pool(initializer=_init_worker,
+ initargs=(self._tile_processors,
+ dataset,
+ self._data_store_factory,
+ self._metadata_store_factory,
+ shared_memory),
+ maxtasksperchild=self._max_concurrency,
+ childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
- await pool.map(_process_tile_in_worker, chunk)
+ for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE):
+ try:
+ await pool.map(_process_tile_in_worker, chunk)
+ except ProxyException:
+ pool.terminate()
+ # Give the shared memory manager some time to write the exception
+ # await asyncio.sleep(1)
+ raise pickle.loads(shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
@staticmethod
- def _chunk_list(items, chunk_size):
+ def _chunk_list(items, chunk_size: int):
return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index 14a44f5..8b69ad2 100644
--- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -21,6 +21,7 @@ import numpy as np
import xarray as xr
from nexusproto import DataTile_pb2 as nexusproto
+from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC):
self.latitude = latitude
self.longitude = longitude
- # Common optional properties
- self.temp_dir = None
- self.metadata = None
- # self.temp_dir = self.environ['TEMP_DIR']
- # self.metadata = self.environ['META']
-
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
- dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec)
+ try:
+ dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec)
- output_tile = nexusproto.NexusTile()
- output_tile.CopyFrom(tile)
- output_tile.summary.data_var_name = self.variable_to_read
+ output_tile = nexusproto.NexusTile()
+ output_tile.CopyFrom(tile)
+ output_tile.summary.data_var_name = self.variable_to_read
- return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+ except Exception:
+ raise TileProcessingError("Could not generate tiles from the granule.")
@abstractmethod
def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile):
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..a64399b 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck
class DataStore(HealthCheck, ABC):
+
@abstractmethod
def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
pass
diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py
index c18bf8b..34e66c6 100644
--- a/granule_ingester/tests/pipeline/test_Pipeline.py
+++ b/granule_ingester/tests/pipeline/test_Pipeline.py
@@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase):
pass
relative_path = "../config_files/ingestion_config_testfile.yaml"
- file_path = os.path.join(os.path.dirname(__file__), relative_path)
- pipeline = Pipeline.from_file(config_path=str(file_path),
- data_store_factory=MockDataStore,
- metadata_store_factory=MockMetadataStore)
+ with open(os.path.join(os.path.dirname(__file__), relative_path)) as file:
+ yaml_str = file.read()
+ pipeline = Pipeline.from_string(config_str=yaml_str,
+ data_store_factory=MockDataStore,
+ metadata_store_factory=MockMetadataStore)
self.assertEqual(pipeline._data_store_factory, MockDataStore)
self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)