You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/07/11 18:55:00 UTC

[beam] branch master updated: Override log levels after log handler is created (#22191)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eb071fa1036 Override log levels after log handler is created (#22191)
eb071fa1036 is described below

commit eb071fa10361360551a6a71bbdfd0bccc07ff0c5
Author: Yi Hu <hu...@gmail.com>
AuthorDate: Mon Jul 11 14:54:53 2022 -0400

    Override log levels after log handler is created (#22191)
    
    Co-authored-by: tvalentyn <tv...@users.noreply.github.com>
---
 .../apache_beam/runners/worker/sdk_worker_main.py  | 17 ++++++++-------
 .../runners/worker/sdk_worker_main_test.py         | 25 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 706e1e61e23..53cdbad5d71 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -69,11 +69,6 @@ def _import_beam_plugins(plugins):
 
 def create_harness(environment, dry_run=False):
   """Creates SDK Fn Harness."""
-  pipeline_options_dict = _load_pipeline_options(
-      environment.get('PIPELINE_OPTIONS'))
-  default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
-  logging.getLogger().setLevel(default_log_level)
-  _set_log_level_overrides(pipeline_options_dict)
 
   if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
     try:
@@ -94,6 +89,12 @@ def create_harness(environment, dry_run=False):
   else:
     fn_log_handler = None
 
+  pipeline_options_dict = _load_pipeline_options(
+      environment.get('PIPELINE_OPTIONS'))
+  default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
+  logging.getLogger().setLevel(default_log_level)
+  _set_log_level_overrides(pipeline_options_dict)
+
   # These are used for dataflow templates.
   RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
   sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
@@ -256,8 +257,8 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int:
   """Get log level from options dict's entry `default_sdk_harness_log_level`.
   If not specified, default log level is logging.INFO.
   """
-  log_level = options_dict.get('default_sdk_harness_log_level', 'INFO')
-
+  dict_level = options_dict.get('default_sdk_harness_log_level', 'INFO')
+  log_level = dict_level
   if log_level.isdecimal():
     log_level = int(log_level)
   else:
@@ -265,7 +266,7 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int:
     log_level = getattr(logging, log_level, None)
     if not isinstance(log_level, int):
       # unknown log level.
-      _LOGGER.error("Unknown log level. Use default value INFO.", exc_info=True)
+      _LOGGER.error("Unknown log level %s. Use default value INFO.", dict_level)
       log_level = logging.INFO
 
   return log_level
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
index fe15b579d3c..1e976bc0001 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -19,6 +19,7 @@
 
 # pytype: skip-file
 
+import io
 import logging
 import unittest
 
@@ -100,6 +101,30 @@ class SdkWorkerMainTest(unittest.TestCase):
     self.assertTrue(test_runtime_provider.is_accessible())
     self.assertEqual(test_runtime_provider.get(), 37)
 
+  def test_create_sdk_harness_log_handler_received_log(self):
+    # tests that the log handler created in create_harness() does not miss
+    # logs emitted from create_harness() itself.
+    logstream = io.StringIO()
+
+    class InMemoryHandler(logging.StreamHandler):
+      def __init__(self, *unused):
+        super().__init__(stream=logstream)
+
+    with unittest.mock.patch(
+        'apache_beam.runners.worker.sdk_worker_main.FnApiLogRecordHandler',
+        InMemoryHandler):
+      sdk_worker_main.create_harness({
+          'LOGGING_API_SERVICE_DESCRIPTOR': '',
+          'CONTROL_API_SERVICE_DESCRIPTOR': '',
+          'PIPELINE_OPTIONS': '{"default_sdk_harness_log_level":"INVALID",'
+          '"sdk_harness_log_level_overrides":"{INVALID_JSON}"}',
+      },
+                                     dry_run=True)
+    logstream.seek(0)
+    logs = logstream.read()
+    self.assertIn('Unknown log level', logs)
+    self.assertIn('Unable to parse sdk_harness_log_level_overrides', logs)
+
   def test_import_beam_plugins(self):
     sdk_worker_main._import_beam_plugins(BeamPlugin.get_all_plugin_paths())