You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/05 02:14:29 UTC
[incubator-sdap-ingester] branch dev updated: SDAP-273: Configure
max threads in Granule Ingester (#13)
This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 68110d0 SDAP-273: Configure max threads in Granule Ingester (#13)
68110d0 is described below
commit 68110d0f4544e4d39301fcb3682e4f67b935c666
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Aug 4 19:14:22 2020 -0700
SDAP-273: Configure max threads in Granule Ingester (#13)
---
granule_ingester/docker/entrypoint.sh | 19 ++++++++-------
.../granule_ingester/consumer/Consumer.py | 15 ++++++++----
granule_ingester/granule_ingester/main.py | 24 +++++++++++--------
.../granule_ingester/pipeline/Pipeline.py | 28 ++++++++++++++--------
4 files changed, 52 insertions(+), 34 deletions(-)
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index b703ee3..04ed15c 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -1,12 +1,13 @@
#!/bin/sh
python /sdap/granule_ingester/main.py \
- $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
- $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \
- $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \
- $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
- $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
- $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
- $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \
- $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \
- $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT)
+ $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
+ $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \
+ $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \
+ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
+ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra-contact-points=$CASSANDRA_CONTACT_POINTS) \
+ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra-port=$CASSANDRA_PORT) \
+ $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra-username=$CASSANDRA_USERNAME) \
+ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
+ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \
+ $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py
index 75d347a..5df51fe 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -46,7 +46,7 @@ class Consumer(HealthCheck):
connection = await self._get_connection()
await connection.close()
return True
- except:
+ except Exception:
logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string))
return False
@@ -64,25 +64,30 @@ class Consumer(HealthCheck):
@staticmethod
async def _received_message(message: aio_pika.IncomingMessage,
data_store_factory,
- metadata_store_factory):
+ metadata_store_factory,
+ pipeline_max_concurrency: int):
logger.info("Received a job from the queue. Starting pipeline.")
try:
config_str = message.body.decode("utf-8")
logger.debug(config_str)
pipeline = Pipeline.from_string(config_str=config_str,
data_store_factory=data_store_factory,
- metadata_store_factory=metadata_store_factory)
+ metadata_store_factory=metadata_store_factory,
+ max_concurrency=pipeline_max_concurrency)
await pipeline.run()
message.ack()
except Exception as e:
message.reject(requeue=True)
logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e))
- async def start_consuming(self):
+ async def start_consuming(self, pipeline_max_concurrency=16):
channel = await self._connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
- await self._received_message(message, self._data_store_factory, self._metadata_store_factory)
+ await self._received_message(message,
+ self._data_store_factory,
+ self._metadata_store_factory,
+ pipeline_max_concurrency)
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 9010e33..b54cffd 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -47,43 +47,47 @@ async def run_health_checks(dependencies: List[HealthCheck]):
async def main():
parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
'and ingest a granule for each message that comes through.')
- parser.add_argument('--rabbitmq_host',
+ parser.add_argument('--rabbitmq-host',
default='localhost',
metavar='HOST',
help='RabbitMQ hostname to connect to. (Default: "localhost")')
- parser.add_argument('--rabbitmq_username',
+ parser.add_argument('--rabbitmq-username',
default='guest',
metavar='USERNAME',
help='RabbitMQ username. (Default: "guest")')
- parser.add_argument('--rabbitmq_password',
+ parser.add_argument('--rabbitmq-password',
default='guest',
metavar='PASSWORD',
help='RabbitMQ password. (Default: "guest")')
- parser.add_argument('--rabbitmq_queue',
+ parser.add_argument('--rabbitmq-queue',
default="nexus",
metavar="QUEUE",
help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')
- parser.add_argument('--cassandra_contact_points',
+ parser.add_argument('--cassandra-contact-points',
default=['localhost'],
metavar="HOST",
nargs='+',
help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")')
- parser.add_argument('--cassandra_port',
+ parser.add_argument('--cassandra-port',
default=9042,
metavar="PORT",
help='Cassandra port. (Default: 9042)')
- parser.add_argument('--cassandra_username',
+ parser.add_argument('--cassandra-username',
metavar="USERNAME",
default=None,
help='Cassandra username. Optional.')
- parser.add_argument('--cassandra_password',
+ parser.add_argument('--cassandra-password',
metavar="PASSWORD",
default=None,
help='Cassandra password. Optional.')
- parser.add_argument('--solr_host_and_port',
+ parser.add_argument('--solr-host-and-port',
default='http://localhost:8983',
metavar='HOST:PORT',
help='Solr host and port. (Default: http://localhost:8983)')
+ parser.add_argument('--max-threads',
+ default=16,
+ metavar='MAX_THREADS',
+ help='Maximum number of threads to use when processing granules. (Default: 16)')
parser.add_argument('-v',
'--verbose',
action='store_true',
@@ -126,7 +130,7 @@ async def main():
consumer]):
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
- await consumer.start_consuming()
+ await consumer.start_consuming(args.max_threads)
else:
logger.error("Quitting because not all dependencies passed the health checks.")
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 8f2dd6f..e1e53bf 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -15,19 +15,20 @@
import logging
+import os
import time
from typing import List
-import aiomultiprocess
import xarray as xr
import yaml
-from nexusproto import DataTile_pb2 as nexusproto
+import aiomultiprocess
from granule_ingester.granule_loaders import GranuleLoader
from granule_ingester.pipeline.Modules import modules as processor_module_mappings
from granule_ingester.processors.TileProcessor import TileProcessor
from granule_ingester.slicers import TileSlicer
from granule_ingester.writers import DataStore, MetadataStore
+from nexusproto import DataTile_pb2 as nexusproto
logger = logging.getLogger(__name__)
@@ -81,36 +82,41 @@ class Pipeline:
slicer: TileSlicer,
data_store_factory,
metadata_store_factory,
- tile_processors: List[TileProcessor]):
+ tile_processors: List[TileProcessor],
+ max_concurrency: int):
self._granule_loader = granule_loader
self._tile_processors = tile_processors
self._slicer = slicer
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
+ self._max_concurrency = max_concurrency
@classmethod
- def from_string(cls, config_str: str, data_store_factory, metadata_store_factory):
+ def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
config = yaml.load(config_str, yaml.FullLoader)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
- processor_module_mappings)
+ processor_module_mappings,
+ max_concurrency)
@classmethod
- def from_file(cls, config_path: str, data_store_factory, metadata_store_factory):
+ def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
with open(config_path) as config_file:
config = yaml.load(config_file, yaml.FullLoader)
return cls._build_pipeline(config,
data_store_factory,
metadata_store_factory,
- processor_module_mappings)
+ processor_module_mappings,
+ max_concurrency)
@classmethod
def _build_pipeline(cls,
config: dict,
data_store_factory,
metadata_store_factory,
- module_mappings: dict):
+ module_mappings: dict,
+ max_concurrency: int):
granule_loader = GranuleLoader(**config['granule'])
slicer_config = config['slicer']
@@ -121,7 +127,7 @@ class Pipeline:
module = cls._parse_module(processor_config, module_mappings)
tile_processors.append(module)
- return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors)
+ return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency)
@classmethod
def _parse_module(cls, module_config: dict, module_mappings: dict):
@@ -142,7 +148,9 @@ class Pipeline:
initargs=(self._tile_processors,
dataset,
self._data_store_factory,
- self._metadata_store_factory)) as pool:
+ self._metadata_store_factory),
+ maxtasksperchild=self._max_concurrency,
+ childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
# aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that