You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/04 22:49:20 UTC
[incubator-sdap-ingester] 05/11: propagate child worker exceptions
up to main process
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 11789f4ed7587d481c93aaf8b90b5a6f54b0dfc0
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 13:52:50 2020 -0700
propagate child worker exceptions up to main process
---
.../granule_ingester/exceptions/Exceptions.py | 6 +++
.../granule_ingester/exceptions/__init__.py | 1 +
.../granule_ingester/pipeline/Pipeline.py | 53 ++++++++++++++--------
granule_ingester/requirements.txt | 1 +
4 files changed, 41 insertions(+), 20 deletions(-)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index c648b99..7741ca6 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -25,6 +25,10 @@ class SolrLostConnectionError(LostConnectionError):
pass
+class CassandraConnectionError(Exception):
+ pass
+
+
class FailedHealthCheckError(Exception):
pass
@@ -39,3 +43,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
+
+
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index ea0969f..838ccff 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
+from .Exceptions import CassandraConnectionError
from .Exceptions import CassandraFailedHealthCheckError
from .Exceptions import CassandraLostConnectionError
from .Exceptions import FailedHealthCheckError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e52d99f..14dc032 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,20 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
+import pickle
import time
+from multiprocessing import Manager
from typing import List
import aiomultiprocess
import xarray as xr
import yaml
from aiomultiprocess.types import ProxyException
-from cassandra.cluster import NoHostAvailable
from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
from yaml.scanner import ScannerError
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+from granule_ingester.exceptions import PipelineBuildingError
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
@@ -41,13 +42,15 @@ _worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
_worker_processor_list: List[TileProcessor] = None
_worker_dataset = None
+_shared_memory = None
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
global _worker_data_store
global _worker_metadata_store
global _worker_processor_list
global _worker_dataset
+ global _shared_memory
# _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
# however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
_worker_metadata_store = metadata_store_factory()
_worker_processor_list = processor_list
_worker_dataset = dataset
+ _shared_memory = shared_memory
async def _process_tile_in_worker(serialized_input_tile: str):
- global _worker_data_store
- global _worker_metadata_store
- global _worker_processor_list
- global _worker_dataset
+ try:
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
- input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
- processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-
- if processed_tile:
- # try:
- await _worker_data_store.save_data(processed_tile)
- await _worker_metadata_store.save_metadata(processed_tile)
- # except NoHostAvailable as e:
- # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+ except Exception as e:
+ pickling_support.install(e)
+ _shared_memory.error = pickle.dumps(e)
+ raise
def _recurse(processor_list: List[TileProcessor],
@@ -96,10 +97,15 @@ class Pipeline:
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ # Create a SyncManager Namespace so that we can to communicate exceptions from the
+ # worker processes back to the main process.
+ self._shared_memory = Manager().Namespace()
+
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
try:
config = yaml.load(config_str, yaml.FullLoader)
+ cls._validate_config(config)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
@@ -108,6 +114,12 @@ class Pipeline:
except yaml.scanner.ScannerError:
raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
+ # TODO: this method should validate the config against an actual schema definition
+ @staticmethod
+ def _validate_config(config: dict):
+ if type(config) is not dict:
+ raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
@classmethod
def _build_pipeline(cls,
config: dict,
@@ -150,17 +162,18 @@ class Pipeline:
initargs=(self._tile_processors,
dataset,
self._data_store_factory,
- self._metadata_store_factory)) as pool:
+ self._metadata_store_factory,
+ self._shared_memory)) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to batch it.
- for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+ for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
try:
await pool.map(_process_tile_in_worker, chunk)
except ProxyException:
pool.terminate()
- raise PipelineRunningError("Running the pipeline failed and could not recover.")
+ raise pickle.loads(self._shared_memory.error)
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index a6d64a2..0479f99 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,4 @@
cassandra-driver==3.23.0
aiomultiprocess==0.7.0
aioboto3
+tblib==1.6.0
\ No newline at end of file