You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/30 01:14:52 UTC

[incubator-sdap-ingester] 05/10: propagate child worker exceptions up to main process

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit de8c5a00a7d8589f072ac5865cdde102864ae298
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Tue Jul 14 13:52:50 2020 -0700

    propagate child worker exceptions up to main process
---
 .../services/MessagePublisher.py                   |  2 +-
 .../granule_ingester/exceptions/Exceptions.py      |  6 +++
 .../granule_ingester/exceptions/__init__.py        |  1 +
 .../granule_ingester/pipeline/Pipeline.py          | 53 ++++++++++++++--------
 .../granule_ingester/writers/CassandraStore.py     | 17 ++++---
 granule_ingester/requirements.txt                  |  1 +
 6 files changed, 53 insertions(+), 27 deletions(-)

diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py
index 559a69d..f7a5517 100644
--- a/collection_manager/collection_manager/services/MessagePublisher.py
+++ b/collection_manager/collection_manager/services/MessagePublisher.py
@@ -27,7 +27,7 @@ class MessagePublisher:
         :return: None
         """
         properties = pika.BasicProperties(content_type='text/plain',
-                                          delivery_mode=1,
+                                          delivery_mode=2,
                                           priority=priority)
         self._channel.basic_publish(exchange='',
                                     routing_key=self._queue,
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 6e7d89a..f43bc2f 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -14,6 +14,10 @@ class RabbitMQConnectionError(Exception):
     pass
 
 
+class CassandraConnectionError(Exception):
+    pass
+
+
 class FailedHealthCheckError(Exception):
     pass
 
@@ -28,3 +32,5 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
 
 class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
+
+
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py
index 2ba1b4a..400c9bf 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,3 +1,4 @@
+from .Exceptions import CassandraConnectionError
 from .Exceptions import CassandraFailedHealthCheckError
 from .Exceptions import FailedHealthCheckError
 from .Exceptions import PipelineBuildingError
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index e52d99f..14dc032 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -13,20 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import logging
+import pickle
 import time
+from multiprocessing import Manager
 from typing import List
 
 import aiomultiprocess
 import xarray as xr
 import yaml
 from aiomultiprocess.types import ProxyException
-from cassandra.cluster import NoHostAvailable
 from nexusproto import DataTile_pb2 as nexusproto
+from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -41,13 +42,15 @@ _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
 _worker_processor_list: List[TileProcessor] = None
 _worker_dataset = None
+_shared_memory = None
 
 
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
     global _worker_data_store
     global _worker_metadata_store
     global _worker_processor_list
     global _worker_dataset
+    global _shared_memory
 
     # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process;
     # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry.
@@ -55,23 +58,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
     _worker_metadata_store = metadata_store_factory()
     _worker_processor_list = processor_list
     _worker_dataset = dataset
+    _shared_memory = shared_memory
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
-    global _worker_data_store
-    global _worker_metadata_store
-    global _worker_processor_list
-    global _worker_dataset
+    try:
+        input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+        processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
 
-    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
-    processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
-
-    if processed_tile:
-        # try:
-        await _worker_data_store.save_data(processed_tile)
-        await _worker_metadata_store.save_metadata(processed_tile)
-        # except NoHostAvailable as e:
-        #     logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra")
+        if processed_tile:
+            await _worker_data_store.save_data(processed_tile)
+            await _worker_metadata_store.save_metadata(processed_tile)
+    except Exception as e:
+        pickling_support.install(e)
+        _shared_memory.error = pickle.dumps(e)
+        raise
 
 
 def _recurse(processor_list: List[TileProcessor],
@@ -96,10 +97,15 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        # Create a SyncManager Namespace so that we can to communicate exceptions from the
+        # worker processes back to the main process.
+        self._shared_memory = Manager().Namespace()
+
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
         try:
             config = yaml.load(config_str, yaml.FullLoader)
+            cls._validate_config(config)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
@@ -108,6 +114,12 @@ class Pipeline:
         except yaml.scanner.ScannerError:
             raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.")
 
+    # TODO: this method should validate the config against an actual schema definition
+    @staticmethod
+    def _validate_config(config: dict):
+        if type(config) is not dict:
+            raise PipelineBuildingError("Cannot build pipeline because the config is not valid YAML.")
+
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
@@ -150,17 +162,18 @@ class Pipeline:
                                             initargs=(self._tile_processors,
                                                       dataset,
                                                       self._data_store_factory,
-                                                      self._metadata_store_factory)) as pool:
+                                                      self._metadata_store_factory,
+                                                      self._shared_memory)) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to batch it.
-                for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
+                for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
                     try:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
-                        raise PipelineRunningError("Running the pipeline failed and could not recover.")
+                        raise pickle.loads(self._shared_memory.error)
 
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 530871d..fbb5a7d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -21,9 +21,10 @@ import uuid
 from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
+from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
 from nexusproto.DataTile_pb2 import NexusTile, TileData
 
-from granule_ingester.exceptions import CassandraFailedHealthCheckError
+from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError
 from granule_ingester.writers.DataStore import DataStore
 
 logging.getLogger('cassandra').setLevel(logging.INFO)
@@ -52,7 +53,11 @@ class CassandraStore(DataStore):
             raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!")
 
     def _get_session(self) -> Session:
-        cluster = Cluster(contact_points=self._contact_points, port=self._port)
+        cluster = Cluster(contact_points=self._contact_points,
+                          port=self._port,
+                          # load_balancing_policy=
+                          reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
+                          default_retry_policy=RetryPolicy())
         session = cluster.connect()
         session.set_keyspace('nexustiles')
         return session
@@ -69,10 +74,10 @@ class CassandraStore(DataStore):
             tile_id = uuid.UUID(tile.summary.tile_id)
             serialized_tile_data = TileData.SerializeToString(tile.tile)
             prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
-            await type(self)._execute_query_async(self._session, prepared_query,
-                                                  [tile_id, bytearray(serialized_tile_data)])
-        except NoHostAvailable as e:
-            logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}")
+            await self._execute_query_async(self._session, prepared_query,
+                                            [tile_id, bytearray(serialized_tile_data)])
+        except Exception:
+            raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.")
 
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):
diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt
index a6d64a2..0479f99 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,3 +1,4 @@
 cassandra-driver==3.23.0
 aiomultiprocess==0.7.0
 aioboto3
+tblib==1.6.0
\ No newline at end of file