You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/07/23 00:20:59 UTC

[beam] branch master updated: Log level fix on local runner (#22420)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a6f07296fee Log level fix on local runner (#22420)
a6f07296fee is described below

commit a6f07296fee95618c8bad72d68aebe53d00ada6f
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Jul 22 19:20:52 2022 -0500

    Log level fix on local runner (#22420)
---
 .../portability/fn_api_runner/worker_handlers.py   | 15 ++-------
 .../runners/portability/local_job_service.py       |  6 +++-
 .../apache_beam/runners/worker/log_handler.py      | 38 +++++++++++++++-------
 sdks/python/apache_beam/utils/subprocess_server.py |  5 +--
 4 files changed, 36 insertions(+), 28 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 6f94f3efc4e..5aaadbd4387 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -65,6 +65,7 @@ from apache_beam.runners.portability.fn_api_runner.execution import Buffer
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
+from apache_beam.runners.worker.log_handler import LOGENTRY_TO_LOG_LEVEL_MAP
 from apache_beam.runners.worker.sdk_worker import _Future
 from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.utils import proto_utils
@@ -407,24 +408,12 @@ class EmbeddedWorkerHandler(WorkerHandler):
 
 
 class BasicLoggingService(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
-
-  LOG_LEVEL_MAP = {
-      beam_fn_api_pb2.LogEntry.Severity.CRITICAL: logging.CRITICAL,
-      beam_fn_api_pb2.LogEntry.Severity.ERROR: logging.ERROR,
-      beam_fn_api_pb2.LogEntry.Severity.WARN: logging.WARNING,
-      beam_fn_api_pb2.LogEntry.Severity.NOTICE: logging.INFO + 1,
-      beam_fn_api_pb2.LogEntry.Severity.INFO: logging.INFO,
-      beam_fn_api_pb2.LogEntry.Severity.DEBUG: logging.DEBUG,
-      beam_fn_api_pb2.LogEntry.Severity.TRACE: logging.DEBUG - 1,
-      beam_fn_api_pb2.LogEntry.Severity.UNSPECIFIED: logging.NOTSET,
-  }
-
   def Logging(self, log_messages, context=None):
     # type: (Iterable[beam_fn_api_pb2.LogEntry.List], Any) -> Iterator[beam_fn_api_pb2.LogControl]
     yield beam_fn_api_pb2.LogControl()
     for log_message in log_messages:
       for log in log_message.log_entries:
-        logging.log(self.LOG_LEVEL_MAP[log.severity], str(log))
+        logging.log(LOGENTRY_TO_LOG_LEVEL_MAP[log.severity], str(log))
 
 
 class BasicProvisionService(beam_provision_api_pb2_grpc.ProvisionServiceServicer
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 509ebb4e208..84d1ded714d 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -50,6 +50,7 @@ from apache_beam.runners.portability import artifact_service
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability.fn_api_runner import fn_runner
 from apache_beam.runners.portability.fn_api_runner import worker_handlers
+from apache_beam.runners.worker.log_handler import LOGENTRY_TO_LOG_LEVEL_MAP
 from apache_beam.utils import thread_pool_executor
 
 if TYPE_CHECKING:
@@ -367,7 +368,10 @@ class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
   def Logging(self, log_bundles, context=None):
     for log_bundle in log_bundles:
       for log_entry in log_bundle.log_entries:
-        _LOGGER.info('Worker: %s', str(log_entry).replace('\n', ' '))
+        _LOGGER.log(
+            LOGENTRY_TO_LOG_LEVEL_MAP[log_entry.severity],
+            'Worker: %s',
+            str(log_entry).replace('\n', ' '))
     return iter([])
 
 
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py
index 75cdcf5fb85..f5347d3884d 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -46,6 +46,29 @@ from apache_beam.utils.sentinel import Sentinel
 if TYPE_CHECKING:
   from apache_beam.portability.api import endpoints_pb2
 
+# Mapping from logging levels to LogEntry levels.
+LOG_LEVEL_TO_LOGENTRY_MAP = {
+    logging.FATAL: beam_fn_api_pb2.LogEntry.Severity.CRITICAL,
+    logging.ERROR: beam_fn_api_pb2.LogEntry.Severity.ERROR,
+    logging.WARNING: beam_fn_api_pb2.LogEntry.Severity.WARN,
+    logging.INFO: beam_fn_api_pb2.LogEntry.Severity.INFO,
+    logging.DEBUG: beam_fn_api_pb2.LogEntry.Severity.DEBUG,
+    logging.NOTSET: beam_fn_api_pb2.LogEntry.Severity.UNSPECIFIED,
+    -float('inf'): beam_fn_api_pb2.LogEntry.Severity.DEBUG,
+}
+
+# Mapping from LogEntry levels to logging levels
+LOGENTRY_TO_LOG_LEVEL_MAP = {
+    beam_fn_api_pb2.LogEntry.Severity.CRITICAL: logging.CRITICAL,
+    beam_fn_api_pb2.LogEntry.Severity.ERROR: logging.ERROR,
+    beam_fn_api_pb2.LogEntry.Severity.WARN: logging.WARNING,
+    beam_fn_api_pb2.LogEntry.Severity.NOTICE: logging.INFO + 1,
+    beam_fn_api_pb2.LogEntry.Severity.INFO: logging.INFO,
+    beam_fn_api_pb2.LogEntry.Severity.DEBUG: logging.DEBUG,
+    beam_fn_api_pb2.LogEntry.Severity.TRACE: logging.DEBUG - 1,
+    beam_fn_api_pb2.LogEntry.Severity.UNSPECIFIED: logging.NOTSET,
+}
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 
@@ -60,16 +83,6 @@ class FnApiLogRecordHandler(logging.Handler):
   # dropped. If the average log size is 1KB this may use up to 10MB of memory.
   _QUEUE_SIZE = 10000
 
-  # Mapping from logging levels to LogEntry levels.
-  LOG_LEVEL_MAP = {
-      logging.FATAL: beam_fn_api_pb2.LogEntry.Severity.CRITICAL,
-      logging.ERROR: beam_fn_api_pb2.LogEntry.Severity.ERROR,
-      logging.WARNING: beam_fn_api_pb2.LogEntry.Severity.WARN,
-      logging.INFO: beam_fn_api_pb2.LogEntry.Severity.INFO,
-      logging.DEBUG: beam_fn_api_pb2.LogEntry.Severity.DEBUG,
-      -float('inf'): beam_fn_api_pb2.LogEntry.Severity.DEBUG,
-  }
-
   def __init__(self, log_service_descriptor):
     # type: (endpoints_pb2.ApiServiceDescriptor) -> None
     super().__init__()
@@ -101,11 +114,12 @@ class FnApiLogRecordHandler(logging.Handler):
   def map_log_level(self, level):
     # type: (int) -> beam_fn_api_pb2.LogEntry.Severity.Enum
     try:
-      return self.LOG_LEVEL_MAP[level]
+      return LOG_LEVEL_TO_LOGENTRY_MAP[level]
     except KeyError:
       return max(
           beam_level for python_level,
-          beam_level in self.LOG_LEVEL_MAP.items() if python_level <= level)
+          beam_level in LOG_LEVEL_TO_LOGENTRY_MAP.items()
+          if python_level <= level)
 
   def emit(self, record):
     # type: (logging.LogRecord) -> None
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py
index 5341f9653bf..769a5f4fe05 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -121,8 +121,9 @@ class SubprocessServer(object):
       def log_stdout():
         line = self._process.stdout.readline()
         while line:
-          # Remove newline via rstrip() to not print an empty line
-          _LOGGER.info(line.rstrip())
+          # The log obtained from stdout is bytes, decode it into string.
+          # Remove newline via rstrip() to not print an empty line.
+          _LOGGER.info(line.decode(errors='backslashreplace').rstrip())
           line = self._process.stdout.readline()
 
       t = threading.Thread(target=log_stdout)