You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/30 01:14:52 UTC
[incubator-sdap-ingester] 05/10: propagate child worker exceptions
up to main process
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit de8c5a00a7d8589f072ac5865cdde102864ae298
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 13:52:50 2020 -0700
propagate child worker exceptions up to main process
---
.../services/MessagePublisher.py | 2 +-
.../granule_ingester/exceptions/Exceptions.py | 6 +++
.../granule_ingester/exceptions/__init__.py | 1 +
.../granule_ingester/pipeline/Pipeline.py | 53 ++++++++++++++--------
.../granule_ingester/writers/CassandraStore.py | 17 ++++---
granule_ingester/requirements.txt | 1 +
6 files changed, 53 insertions(+), 27 deletions(-)
diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 559a69d..f7a5517 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -27,7 +27,7 @@ class MessagePublisher:
:return: None
"""
properties = pika.BasicProperties(content_type='text/plain',
- delivery_mode=1,
+ delivery_mode=2,
priority=priority)
self._channel.basic_publish(exchange='',
routing_key=self._queue,
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 6e7d89a..f43bc2f 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -14,6 +14,10 @@ class RabbitMQConnectionError(Exception):
pass
+class CassandraConnectionError(Exception):
+ pass
+
+
class FailedHealthCheckError(Exception):
pass
@@ -28,3 +32,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
+
+
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 2ba1b4a..400c9bf 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
+from .Exceptions import CassandraConnectionError
from .Exceptions import CassandraFailedHealthCheckError
from .Exceptions import FailedHealthCheckError
from .Exceptions import PipelineBuildingError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e52d99f..14dc032 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,20 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
+import pickle
import time
+from multiprocessing import Manager
from typing import List
import aiomultiprocess
import xarray as xr
import yaml
from aiomultiprocess.types import ProxyException
-from cassandra.cluster import NoHostAvailable
from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -41,13 +42,15 @@ _worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
_worker_processor_list: List[TileProcessor] = None
_worker_dataset = None
+_shared_memory = None
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
global _worker_data_store
global _worker_metadata_store
global _worker_processor_list
global _worker_dataset
+ global _shared_memory
# _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
# however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
_worker_metadata_store = metadata_store_factory()
_worker_processor_list = processor_list
_worker_dataset = dataset
+ _shared_memory = shared_memory
async def _process_tile_in_worker(serialized_input_tile: str):
- global _worker_data_store
- global _worker_metadata_store
- global _worker_processor_list
- global _worker_dataset
+ try:
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
- processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-
- if processed_tile:
- # try:
- await _worker_data_store.save_data(processed_tile)
- await _worker_metadata_store.save_metadata(processed_tile)
- # except NoHostAvailable as e:
- # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+ except Exception as e:
+ pickling_support.install(e)
+ _shared_memory.error = pickle.dumps(e)
+ raise
def _recurse(processor_list: List[TileProcessor],
@@ -96,10 +97,15 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ # Create a SyncManager Namespace so that we can to communicate exceptions from the
+ # worker processes back to the main process.
+ self._shared_memory = Manager().Namespace()
+
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
try:
config = yaml.load(config_str, yaml.FullLoader)
+ cls._validate_config(config)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
@@ -108,6 +114,12 @@ class Pipeline:
except yaml.scanner.ScannerError:
raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+ # TODO: this method should validate the config against an actual schema definition
+ @staticmethod
+ def _validate_config(config: dict):
+ if type(config) is not dict:
+ raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
@@ -150,17 +162,18 @@ class Pipeline:
initargs=(self._tile_processors,
dataset,
self._data_store_factory,
- self._metadata_store_factory)) as pool:
+ self._metadata_store_factory,
+ self._shared_memory)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+ for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
try:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
- raise PipelineRunningError("Running the pipeline failed and could not recover.")
+ raise pickle.loads(self._shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 530871d..fbb5a7d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -21,9 +21,10 @@ import uuid
from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
+from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
from nexusproto.DataTile_pb2 import NexusTile, TileData
-from granule_ingester.exceptions import CassandraFailedHealthCheckError
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError
from granule_ingester.writers.DataStore import DataStore
logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -52,7 +53,11 @@ class CassandraStore(DataStore):
raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
def _get_session(self) -> Session:
- cluster = Cluster(contact_points=self._contact_points, port=self._port)
+ cluster = Cluster(contact_points=self._contact_points,
+ port=self._port,
+ # load_balancing_policy=
+ reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
+ default_retry_policy=RetryPolicy())
session = cluster.connect()
session.set_keyspace('nexustiles')
return session
@@ -69,10 +74,10 @@ class CassandraStore(DataStore):
tile_id = uuid.UUID(tile.summary.tile_id)
serialized_tile_data = TileData.SerializeToString(tile.tile)
prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
- await type(self)._execute_query_async(self._session, prepared_query,
- [tile_id, bytearray(serialized_tile_data)])
- except NoHostAvailable as e:
- logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
+ await self._execute_query_async(self._session, prepared_query,
+ [tile_id, bytearray(serialized_tile_data)])
+ except Exception:
+ raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.")
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index a6d64a2..0479f99 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,4 @@
cassandra-driver==3.23.0
aiomultiprocess==0.7.0
aioboto3
+tblib==1.6.0
\ No newline at end of file