You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2023/01/24 23:14:34 UTC
[incubator-sdap-ingester] branch dev updated: SDAP-423: Fixed ingester verbose setting not propagating to subprocesses (#70)
This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 9785af1 SDAP-423: Fixed ingester verbose setting not propagating to subprocesses (#70)
9785af1 is described below
commit 9785af1d07ee75574bf9ee6f4e12c6b6c8cc7f39
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Tue Jan 24 15:14:29 2023 -0800
SDAP-423: Fixed ingester verbose setting not propagating to subprocesses (#70)
* Fix verbosity settings not propagating to ingester subprocesses
* Changelog
Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
CHANGELOG.md | 9 +++++++++
.../granule_ingester/consumer/MessageConsumer.py | 12 ++++++++----
granule_ingester/granule_ingester/main.py | 4 +++-
.../granule_ingester/pipeline/Pipeline.py | 20 ++++++++++++++++----
.../granule_ingester/writers/SolrStore.py | 1 -
5 files changed, 36 insertions(+), 10 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 72eccca..7033caf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+### Added
+### Changed
+### Deprecated
+### Removed
+### Fixed
+- SDAP-423: Fixed verbosity settings not propagating to ingester subprocesses
+### Security
+
## [1.0.0] - 2022-12-05
### Added
### Changed
diff --git a/granule_ingester/granule_ingester/consumer/MessageConsumer.py b/granule_ingester/granule_ingester/consumer/MessageConsumer.py
index 4d6c07b..7bbddee 100644
--- a/granule_ingester/granule_ingester/consumer/MessageConsumer.py
+++ b/granule_ingester/granule_ingester/consumer/MessageConsumer.py
@@ -16,7 +16,6 @@
import logging
import aio_pika
-
from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
RabbitMQFailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
@@ -33,7 +32,8 @@ class MessageConsumer(HealthCheck):
rabbitmq_password,
rabbitmq_queue,
data_store_factory,
- metadata_store_factory):
+ metadata_store_factory,
+ log_level=logging.INFO):
self._rabbitmq_queue = rabbitmq_queue
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
@@ -42,6 +42,7 @@ class MessageConsumer(HealthCheck):
password=rabbitmq_password,
host=rabbitmq_host)
self._connection: aio_pika.Connection = None
+ self._level = log_level
async def health_check(self) -> bool:
try:
@@ -67,7 +68,8 @@ class MessageConsumer(HealthCheck):
async def _received_message(message: aio_pika.IncomingMessage,
data_store_factory,
metadata_store_factory,
- pipeline_max_concurrency: int):
+ pipeline_max_concurrency: int,
+ log_level=logging.INFO):
logger.info("Received a job from the queue. Starting pipeline.")
try:
config_str = message.body.decode("utf-8")
@@ -76,6 +78,7 @@ class MessageConsumer(HealthCheck):
data_store_factory=data_store_factory,
metadata_store_factory=metadata_store_factory,
max_concurrency=pipeline_max_concurrency)
+ pipeline.set_log_level(log_level)
await pipeline.run()
await message.ack()
except PipelineBuildingError as e:
@@ -102,7 +105,8 @@ class MessageConsumer(HealthCheck):
await self._received_message(message,
self._data_store_factory,
self._metadata_store_factory,
- pipeline_max_concurrency)
+ pipeline_max_concurrency,
+ self._level)
except aio_pika.exceptions.MessageProcessError:
# Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
# connection has died, and attempting to close the queue will only raise another exception.
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 84ab004..453de44 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -140,6 +140,7 @@ async def main(loop):
logging_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=logging_level)
+ logging.getLogger("").setLevel(logging_level)
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
for logger in loggers:
logger.setLevel(logging_level)
@@ -175,7 +176,8 @@ async def main(loop):
cassandra_keyspace,
cassandra_username,
cassandra_password),
- metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
+ metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port),
+ log_level=logging_level)
try:
solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
await run_health_checks([CassandraStore(cassandra_contact_points,
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 484da6d..41bfc3a 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -21,7 +21,6 @@ from typing import List
import xarray as xr
import yaml
-
from aiomultiprocess import Pool
from aiomultiprocess.types import ProxyException
from granule_ingester.exceptions import PipelineBuildingError
@@ -48,7 +47,7 @@ _worker_dataset = None
_shared_memory = None
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory, log_level):
global _worker_processor_list
global _worker_dataset
global _shared_memory
@@ -59,6 +58,13 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
_worker_dataset = dataset
_shared_memory = shared_memory
+ logging.basicConfig(level=log_level)
+
+ logging.getLogger("").setLevel(log_level)
+ loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
+ for logger in loggers:
+ logger.setLevel(log_level)
+
logger.debug("worker init")
async def _process_tile_in_worker(serialized_input_tile: str):
@@ -115,13 +121,15 @@ class Pipeline:
data_store_factory,
metadata_store_factory,
tile_processors: List[TileProcessor],
- max_concurrency: int):
+ max_concurrency: int,
+ log_level=logging.INFO):
self._granule_loader = granule_loader
self._tile_processors = tile_processors
self._slicer = slicer
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
self._max_concurrency = int(max_concurrency)
+ self._level = log_level
# Create a SyncManager so that we can to communicate exceptions from the
# worker processes back to the main process.
@@ -130,6 +138,9 @@ class Pipeline:
def __del__(self):
self._manager.shutdown()
+ def set_log_level(self, level):
+ self._level = level
+
@classmethod
def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
logger.debug(f'config_str: {config_str}')
@@ -209,7 +220,8 @@ class Pipeline:
dataset,
self._data_store_factory,
self._metadata_store_factory,
- shared_memory),
+ shared_memory,
+ self._level),
childconcurrency=self._max_concurrency) as pool:
serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset, granule_name)]
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 8426c45..cd9fe08 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -50,7 +50,6 @@ class SolrStore(MetadataStore):
self.geo_precision: int = 3
self._collection: str = "nexustiles"
self.log: logging.Logger = logging.getLogger(__name__)
- self.log.setLevel(logging.DEBUG)
self._solr = None
def _get_collections(self, zk, parent_nodes):