You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/05 02:14:29 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-273: Configure max threads in Granule Ingester (#13)

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 68110d0  SDAP-273: Configure max threads in Granule Ingester (#13)
68110d0 is described below

commit 68110d0f4544e4d39301fcb3682e4f67b935c666
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 19:14:22 2020 -0700

    SDAP-273: Configure max threads in Granule Ingester (#13)
---
 granule_ingester/docker/entrypoint.sh              | 19 ++++++++-------
 .../granule_ingester/consumer/Consumer.py          | 15 ++++++++----
 granule_ingester/granule_ingester/main.py          | 24 +++++++++++--------
 .../granule_ingester/pipeline/Pipeline.py          | 28 ++++++++++++++--------
 4 files changed, 52 insertions(+), 34 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index b703ee3..04ed15c 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -1,12 +1,13 @@
 #!/bin/sh
 
 python /sdap/granule_ingester/main.py \
-  $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
-  $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \
-  $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \
-  $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
-  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
-  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
-  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
-  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
-  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
+  $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \
+  $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \
+  $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
+  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra-contact-points=$CASSANDRA_CONTACT_POINTS) \
+  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra-port=$CASSANDRA_PORT) \
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra-username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \
+  $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..5df51fe 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -46,7 +46,7 @@ class Consumer(HealthCheck):
             connection = await self._get_connection()
             await connection.close()
             return True
-        except:
+        except Exception:
             logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
             return False
 
@@ -64,25 +64,30 @@ class Consumer(HealthCheck):
     @staticmethod
     async def _received_message(message: aio_pika.IncomingMessage,
                                 data_store_factory,
-                                metadata_store_factory):
+                                metadata_store_factory,
+                                pipeline_max_concurrency: int):
         logger.info("Received a job from the queue. Starting pipeline.")
         try:
             config_str = message.body.decode("utf-8")
             logger.debug(config_str)
             pipeline = Pipeline.from_string(config_str=config_str,
                                             data_store_factory=data_store_factory,
-                                            metadata_store_factory=metadata_store_factory)
+                                            metadata_store_factory=metadata_store_factory,
+                                            max_concurrency=pipeline_max_concurrency)
             await pipeline.run()
             message.ack()
         except Exception as e:
             message.reject(requeue=True)
             logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
 
-    async def start_consuming(self):
+    async def start_consuming(self, pipeline_max_concurrency=16):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
 
         async with queue.iterator() as queue_iter:
             async for message in queue_iter:
-                await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+                await self._received_message(message,
+                                             self._data_store_factory,
+                                             self._metadata_store_factory,
+                                             pipeline_max_concurrency)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 9010e33..b54cffd 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -47,43 +47,47 @@ async def run_health_checks(dependencies: List[HealthCheck]):
 async def main():
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
                                                  'and ingest a granule for each message that comes through.')
-    parser.add_argument('--rabbitmq_host',
+    parser.add_argument('--rabbitmq-host',
                         default='localhost',
                         metavar='HOST',
                         help='RabbitMQ hostname to connect to. (Default: "localhost")')
-    parser.add_argument('--rabbitmq_username',
+    parser.add_argument('--rabbitmq-username',
                         default='guest',
                         metavar='USERNAME',
                         help='RabbitMQ username. (Default: "guest")')
-    parser.add_argument('--rabbitmq_password',
+    parser.add_argument('--rabbitmq-password',
                         default='guest',
                         metavar='PASSWORD',
                         help='RabbitMQ password. (Default: "guest")')
-    parser.add_argument('--rabbitmq_queue',
+    parser.add_argument('--rabbitmq-queue',
                         default="nexus",
                         metavar="QUEUE",
                         help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
-    parser.add_argument('--cassandra_contact_points',
+    parser.add_argument('--cassandra-contact-points',
                         default=['localhost'],
                         metavar="HOST",
                         nargs='+',
                         help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")')
-    parser.add_argument('--cassandra_port',
+    parser.add_argument('--cassandra-port',
                         default=9042,
                         metavar="PORT",
                         help='Cassandra port. (Default: 9042)')
-    parser.add_argument('--cassandra_username',
+    parser.add_argument('--cassandra-username',
                         metavar="USERNAME",
                         default=None,
                         help='Cassandra username. Optional.')
-    parser.add_argument('--cassandra_password',
+    parser.add_argument('--cassandra-password',
                         metavar="PASSWORD",
                         default=None,
                         help='Cassandra password. Optional.')
-    parser.add_argument('--solr_host_and_port',
+    parser.add_argument('--solr-host-and-port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: http://localhost:8983)')
+    parser.add_argument('--max-threads',
+                        default=16,
+                        metavar='MAX_THREADS',
+                        help='Maximum number of threads to use when processing granules. (Default: 16)')
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -126,7 +130,7 @@ async def main():
                                 consumer]):
         async with consumer:
             logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
-            await consumer.start_consuming()
+            await consumer.start_consuming(args.max_threads)
     else:
         logger.error("Quitting because not all dependencies passed the health checks.")
 
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..e1e53bf 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -15,19 +15,20 @@
 
 
 import logging
+import os
 import time
 from typing import List
 
-import aiomultiprocess
 import xarray as xr
 import yaml
-from nexusproto import DataTile_pb2 as nexusproto
 
+import aiomultiprocess
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
 from granule_ingester.slicers import TileSlicer
 from granule_ingester.writers import DataStore, MetadataStore
+from nexusproto import DataTile_pb2 as nexusproto
 
 logger = logging.getLogger(__name__)
 
@@ -81,36 +82,41 @@ class Pipeline:
                  slicer: TileSlicer,
                  data_store_factory,
                  metadata_store_factory,
-                 tile_processors: List[TileProcessor]):
+                 tile_processors: List[TileProcessor],
+                 max_concurrency: int):
         self._granule_loader = granule_loader
         self._tile_processors = tile_processors
         self._slicer = slicer
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
+        self._max_concurrency = max_concurrency
 
     @classmethod
-    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
+    def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
         config = yaml.load(config_str, yaml.FullLoader)
         return cls._build_pipeline(config,
                                    data_store_factory,
                                    metadata_store_factory,
-                                   processor_module_mappings)
+                                   processor_module_mappings,
+                                   max_concurrency)
 
     @classmethod
-    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
+    def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
         with open(config_path) as config_file:
             config = yaml.load(config_file, yaml.FullLoader)
             return cls._build_pipeline(config,
                                        data_store_factory,
                                        metadata_store_factory,
-                                       processor_module_mappings)
+                                       processor_module_mappings,
+                                       max_concurrency)
 
     @classmethod
     def _build_pipeline(cls,
                         config: dict,
                         data_store_factory,
                         metadata_store_factory,
-                        module_mappings: dict):
+                        module_mappings: dict,
+                        max_concurrency: int):
         granule_loader = GranuleLoader(**config['granule'])
 
         slicer_config = config['slicer']
@@ -121,7 +127,7 @@ class Pipeline:
             module = cls._parse_module(processor_config, module_mappings)
             tile_processors.append(module)
 
-        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+        return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
 
     @classmethod
     def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -142,7 +148,9 @@ class Pipeline:
                                             initargs=(self._tile_processors,
                                                       dataset,
                                                       self._data_store_factory,
-                                                      self._metadata_store_factory)) as pool:
+                                                      self._metadata_store_factory),
+                                            maxtasksperchild=self._max_concurrency,
+                                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
                 # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that