You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/07/11 18:55:00 UTC
[beam] branch master updated: Override log levels after log handler is created (#22191)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new eb071fa1036 Override log levels after log handler is created (#22191)
eb071fa1036 is described below
commit eb071fa10361360551a6a71bbdfd0bccc07ff0c5
Author: Yi Hu <hu...@gmail.com>
AuthorDate: Mon Jul 11 14:54:53 2022 -0400
Override log levels after log handler is created (#22191)
Co-authored-by: tvalentyn <tv...@users.noreply.github.com>
---
.../apache_beam/runners/worker/sdk_worker_main.py | 17 ++++++++-------
.../runners/worker/sdk_worker_main_test.py | 25 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 706e1e61e23..53cdbad5d71 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -69,11 +69,6 @@ def _import_beam_plugins(plugins):
def create_harness(environment, dry_run=False):
"""Creates SDK Fn Harness."""
- pipeline_options_dict = _load_pipeline_options(
- environment.get('PIPELINE_OPTIONS'))
- default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
- logging.getLogger().setLevel(default_log_level)
- _set_log_level_overrides(pipeline_options_dict)
if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
try:
@@ -94,6 +89,12 @@ def create_harness(environment, dry_run=False):
else:
fn_log_handler = None
+ pipeline_options_dict = _load_pipeline_options(
+ environment.get('PIPELINE_OPTIONS'))
+ default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
+ logging.getLogger().setLevel(default_log_level)
+ _set_log_level_overrides(pipeline_options_dict)
+
# These are used for dataflow templates.
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
@@ -256,8 +257,8 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int:
"""Get log level from options dict's entry `default_sdk_harness_log_level`.
If not specified, default log level is logging.INFO.
"""
- log_level = options_dict.get('default_sdk_harness_log_level', 'INFO')
-
+ dict_level = options_dict.get('default_sdk_harness_log_level', 'INFO')
+ log_level = dict_level
if log_level.isdecimal():
log_level = int(log_level)
else:
@@ -265,7 +266,7 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int:
log_level = getattr(logging, log_level, None)
if not isinstance(log_level, int):
# unknown log level.
- _LOGGER.error("Unknown log level. Use default value INFO.", exc_info=True)
+ _LOGGER.error("Unknown log level %s. Use default value INFO.", dict_level)
log_level = logging.INFO
return log_level
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
index fe15b579d3c..1e976bc0001 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -19,6 +19,7 @@
# pytype: skip-file
+import io
import logging
import unittest
@@ -100,6 +101,30 @@ class SdkWorkerMainTest(unittest.TestCase):
self.assertTrue(test_runtime_provider.is_accessible())
self.assertEqual(test_runtime_provider.get(), 37)
+ def test_create_sdk_harness_log_handler_received_log(self):
+ # tests that the log handler created in create_harness() does not miss
+ # logs emitted from create_harness() itself.
+ logstream = io.StringIO()
+
+ class InMemoryHandler(logging.StreamHandler):
+ def __init__(self, *unused):
+ super().__init__(stream=logstream)
+
+ with unittest.mock.patch(
+ 'apache_beam.runners.worker.sdk_worker_main.FnApiLogRecordHandler',
+ InMemoryHandler):
+ sdk_worker_main.create_harness({
+ 'LOGGING_API_SERVICE_DESCRIPTOR': '',
+ 'CONTROL_API_SERVICE_DESCRIPTOR': '',
+ 'PIPELINE_OPTIONS': '{"default_sdk_harness_log_level":"INVALID",'
+ '"sdk_harness_log_level_overrides":"{INVALID_JSON}"}',
+ },
+ dry_run=True)
+ logstream.seek(0)
+ logs = logstream.read()
+ self.assertIn('Unknown log level', logs)
+ self.assertIn('Unable to parse sdk_harness_log_level_overrides', logs)
+
def test_import_beam_plugins(self):
sdk_worker_main._import_beam_plugins(BeamPlugin.get_all_plugin_paths())