You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by rk...@apache.org on 2023/01/24 23:14:34 UTC

[incubator-sdap-ingester] branch dev updated: SDAP-423: Fixed ingester verbose setting not propagating to subprocesses (#70)

This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9785af1  SDAP-423: Fixed ingester verbose setting not propagating to subprocesses (#70)
9785af1 is described below

commit 9785af1d07ee75574bf9ee6f4e12c6b6c8cc7f39
Author: Riley Kuttruff <72...@users.noreply.github.com>
AuthorDate: Tue Jan 24 15:14:29 2023 -0800

    SDAP-423: Fixed ingester verbose setting not propagating to subprocesses (#70)
    
    * Fix verbosity settings not propagating to ingester subprocesses
    
    * Changelog
    
    Co-authored-by: rileykk <ri...@jpl.nasa.gov>
---
 CHANGELOG.md                                         |  9 +++++++++
 .../granule_ingester/consumer/MessageConsumer.py     | 12 ++++++++----
 granule_ingester/granule_ingester/main.py            |  4 +++-
 .../granule_ingester/pipeline/Pipeline.py            | 20 ++++++++++++++++----
 .../granule_ingester/writers/SolrStore.py            |  1 -
 5 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 72eccca..7033caf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+### Added
+### Changed
+### Deprecated
+### Removed
+### Fixed
+- SDAP-423: Fixed verbosity settings not propagating to ingester subprocesses
+### Security
+
 ## [1.0.0] - 2022-12-05
 ### Added
 ### Changed
diff --git a/granule_ingester/granule_ingester/consumer/MessageConsumer.py b/granule_ingester/granule_ingester/consumer/MessageConsumer.py
index 4d6c07b..7bbddee 100644
--- a/granule_ingester/granule_ingester/consumer/MessageConsumer.py
+++ b/granule_ingester/granule_ingester/consumer/MessageConsumer.py
@@ -16,7 +16,6 @@
 import logging
 
 import aio_pika
-
 from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \
     RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
@@ -33,7 +32,8 @@ class MessageConsumer(HealthCheck):
                  rabbitmq_password,
                  rabbitmq_queue,
                  data_store_factory,
-                 metadata_store_factory):
+                 metadata_store_factory,
+                 log_level=logging.INFO):
         self._rabbitmq_queue = rabbitmq_queue
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
@@ -42,6 +42,7 @@ class MessageConsumer(HealthCheck):
                                                                                 password=rabbitmq_password,
                                                                                 host=rabbitmq_host)
         self._connection: aio_pika.Connection = None
+        self._level = log_level
 
     async def health_check(self) -> bool:
         try:
@@ -67,7 +68,8 @@ class MessageConsumer(HealthCheck):
     async def _received_message(message: aio_pika.IncomingMessage,
                                 data_store_factory,
                                 metadata_store_factory,
-                                pipeline_max_concurrency: int):
+                                pipeline_max_concurrency: int,
+                                log_level=logging.INFO):
         logger.info("Received a job from the queue. Starting pipeline.")
         try:
             config_str = message.body.decode("utf-8")
@@ -76,6 +78,7 @@ class MessageConsumer(HealthCheck):
                                             data_store_factory=data_store_factory,
                                             metadata_store_factory=metadata_store_factory,
                                             max_concurrency=pipeline_max_concurrency)
+            pipeline.set_log_level(log_level)
             await pipeline.run()
             await message.ack()
         except PipelineBuildingError as e:
@@ -102,7 +105,8 @@ class MessageConsumer(HealthCheck):
                 await self._received_message(message,
                                              self._data_store_factory,
                                              self._metadata_store_factory,
-                                             pipeline_max_concurrency)
+                                             pipeline_max_concurrency,
+                                             self._level)
             except aio_pika.exceptions.MessageProcessError:
                 # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ
                 # connection has died, and attempting to close the queue will only raise another exception.
diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py
index 84ab004..453de44 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -140,6 +140,7 @@ async def main(loop):
 
     logging_level = logging.DEBUG if args.verbose else logging.INFO
     logging.basicConfig(level=logging_level)
+    logging.getLogger("").setLevel(logging_level)
     loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
     for logger in loggers:
         logger.setLevel(logging_level)
@@ -175,7 +176,8 @@ async def main(loop):
                                                               cassandra_keyspace,
                                                               cassandra_username,
                                                               cassandra_password),
-                                   metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
+                                   metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port),
+                                   log_level=logging_level)
         try:
             solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
             await run_health_checks([CassandraStore(cassandra_contact_points,
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 484da6d..41bfc3a 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -21,7 +21,6 @@ from typing import List
 
 import xarray as xr
 import yaml
-
 from aiomultiprocess import Pool
 from aiomultiprocess.types import ProxyException
 from granule_ingester.exceptions import PipelineBuildingError
@@ -48,7 +47,7 @@ _worker_dataset = None
 _shared_memory = None
 
 
-def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory):
+def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory, log_level):
     global _worker_processor_list
     global _worker_dataset
     global _shared_memory
@@ -59,6 +58,13 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
     _worker_dataset = dataset
     _shared_memory = shared_memory
 
+    logging.basicConfig(level=log_level)
+
+    logging.getLogger("").setLevel(log_level)
+    loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
+    for logger in loggers:
+        logger.setLevel(log_level)
+
     logger.debug("worker init")
 
 async def _process_tile_in_worker(serialized_input_tile: str):
@@ -115,13 +121,15 @@ class Pipeline:
                  data_store_factory,
                  metadata_store_factory,
                  tile_processors: List[TileProcessor],
-                 max_concurrency: int):
+                 max_concurrency: int,
+                 log_level=logging.INFO):
         self._granule_loader = granule_loader
         self._tile_processors = tile_processors
         self._slicer = slicer
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
         self._max_concurrency = int(max_concurrency)
+        self._level = log_level
 
         # Create a SyncManager so that we can to communicate exceptions from the
         # worker processes back to the main process.
@@ -130,6 +138,9 @@ class Pipeline:
     def __del__(self):
         self._manager.shutdown()
 
+    def set_log_level(self, level):
+        self._level = level
+
     @classmethod
     def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16):
         logger.debug(f'config_str: {config_str}')
@@ -209,7 +220,8 @@ class Pipeline:
                                       dataset,
                                       self._data_store_factory,
                                       self._metadata_store_factory,
-                                      shared_memory),
+                                      shared_memory,
+                                      self._level),
                             childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, granule_name)]
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index 8426c45..cd9fe08 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -50,7 +50,6 @@ class SolrStore(MetadataStore):
         self.geo_precision: int = 3
         self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
-        self.log.setLevel(logging.DEBUG)
         self._solr = None
 
     def _get_collections(self, zk, parent_nodes):