You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/07/23 00:20:59 UTC
[beam] branch master updated: Log level fix on local runner (#22420)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a6f07296fee Log level fix on local runner (#22420)
a6f07296fee is described below
commit a6f07296fee95618c8bad72d68aebe53d00ada6f
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Jul 22 19:20:52 2022 -0500
Log level fix on local runner (#22420)
---
.../portability/fn_api_runner/worker_handlers.py | 15 ++-------
.../runners/portability/local_job_service.py | 6 +++-
.../apache_beam/runners/worker/log_handler.py | 38 +++++++++++++++-------
sdks/python/apache_beam/utils/subprocess_server.py | 5 +--
4 files changed, 36 insertions(+), 28 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 6f94f3efc4e..5aaadbd4387 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -65,6 +65,7 @@ from apache_beam.runners.portability.fn_api_runner.execution import Buffer
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
+from apache_beam.runners.worker.log_handler import LOGENTRY_TO_LOG_LEVEL_MAP
from apache_beam.runners.worker.sdk_worker import _Future
from apache_beam.runners.worker.statecache import StateCache
from apache_beam.utils import proto_utils
@@ -407,24 +408,12 @@ class EmbeddedWorkerHandler(WorkerHandler):
class BasicLoggingService(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
-
- LOG_LEVEL_MAP = {
- beam_fn_api_pb2.LogEntry.Severity.CRITICAL: logging.CRITICAL,
- beam_fn_api_pb2.LogEntry.Severity.ERROR: logging.ERROR,
- beam_fn_api_pb2.LogEntry.Severity.WARN: logging.WARNING,
- beam_fn_api_pb2.LogEntry.Severity.NOTICE: logging.INFO + 1,
- beam_fn_api_pb2.LogEntry.Severity.INFO: logging.INFO,
- beam_fn_api_pb2.LogEntry.Severity.DEBUG: logging.DEBUG,
- beam_fn_api_pb2.LogEntry.Severity.TRACE: logging.DEBUG - 1,
- beam_fn_api_pb2.LogEntry.Severity.UNSPECIFIED: logging.NOTSET,
- }
-
def Logging(self, log_messages, context=None):
# type: (Iterable[beam_fn_api_pb2.LogEntry.List], Any) -> Iterator[beam_fn_api_pb2.LogControl]
yield beam_fn_api_pb2.LogControl()
for log_message in log_messages:
for log in log_message.log_entries:
- logging.log(self.LOG_LEVEL_MAP[log.severity], str(log))
+ logging.log(LOGENTRY_TO_LOG_LEVEL_MAP[log.severity], str(log))
class BasicProvisionService(beam_provision_api_pb2_grpc.ProvisionServiceServicer
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 509ebb4e208..84d1ded714d 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -50,6 +50,7 @@ from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.fn_api_runner import worker_handlers
+from apache_beam.runners.worker.log_handler import LOGENTRY_TO_LOG_LEVEL_MAP
from apache_beam.utils import thread_pool_executor
if TYPE_CHECKING:
@@ -367,7 +368,10 @@ class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
def Logging(self, log_bundles, context=None):
for log_bundle in log_bundles:
for log_entry in log_bundle.log_entries:
- _LOGGER.info('Worker: %s', str(log_entry).replace('\n', ' '))
+ _LOGGER.log(
+ LOGENTRY_TO_LOG_LEVEL_MAP[log_entry.severity],
+ 'Worker: %s',
+ str(log_entry).replace('\n', ' '))
return iter([])
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py
index 75cdcf5fb85..f5347d3884d 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -46,6 +46,29 @@ from apache_beam.utils.sentinel import Sentinel
if TYPE_CHECKING:
from apache_beam.portability.api import endpoints_pb2
+# Mapping from logging levels to LogEntry levels.
+LOG_LEVEL_TO_LOGENTRY_MAP = {
+ logging.FATAL: beam_fn_api_pb2.LogEntry.Severity.CRITICAL,
+ logging.ERROR: beam_fn_api_pb2.LogEntry.Severity.ERROR,
+ logging.WARNING: beam_fn_api_pb2.LogEntry.Severity.WARN,
+ logging.INFO: beam_fn_api_pb2.LogEntry.Severity.INFO,
+ logging.DEBUG: beam_fn_api_pb2.LogEntry.Severity.DEBUG,
+ logging.NOTSET: beam_fn_api_pb2.LogEntry.Severity.UNSPECIFIED,
+ -float('inf'): beam_fn_api_pb2.LogEntry.Severity.DEBUG,
+}
+
+# Mapping from LogEntry levels to logging levels
+LOGENTRY_TO_LOG_LEVEL_MAP = {
+ beam_fn_api_pb2.LogEntry.Severity.CRITICAL: logging.CRITICAL,
+ beam_fn_api_pb2.LogEntry.Severity.ERROR: logging.ERROR,
+ beam_fn_api_pb2.LogEntry.Severity.WARN: logging.WARNING,
+ beam_fn_api_pb2.LogEntry.Severity.NOTICE: logging.INFO + 1,
+ beam_fn_api_pb2.LogEntry.Severity.INFO: logging.INFO,
+ beam_fn_api_pb2.LogEntry.Severity.DEBUG: logging.DEBUG,
+ beam_fn_api_pb2.LogEntry.Severity.TRACE: logging.DEBUG - 1,
+ beam_fn_api_pb2.LogEntry.Severity.UNSPECIFIED: logging.NOTSET,
+}
+
# This module is experimental. No backwards-compatibility guarantees.
@@ -60,16 +83,6 @@ class FnApiLogRecordHandler(logging.Handler):
# dropped. If the average log size is 1KB this may use up to 10MB of memory.
_QUEUE_SIZE = 10000
- # Mapping from logging levels to LogEntry levels.
- LOG_LEVEL_MAP = {
- logging.FATAL: beam_fn_api_pb2.LogEntry.Severity.CRITICAL,
- logging.ERROR: beam_fn_api_pb2.LogEntry.Severity.ERROR,
- logging.WARNING: beam_fn_api_pb2.LogEntry.Severity.WARN,
- logging.INFO: beam_fn_api_pb2.LogEntry.Severity.INFO,
- logging.DEBUG: beam_fn_api_pb2.LogEntry.Severity.DEBUG,
- -float('inf'): beam_fn_api_pb2.LogEntry.Severity.DEBUG,
- }
-
def __init__(self, log_service_descriptor):
# type: (endpoints_pb2.ApiServiceDescriptor) -> None
super().__init__()
@@ -101,11 +114,12 @@ class FnApiLogRecordHandler(logging.Handler):
def map_log_level(self, level):
# type: (int) -> beam_fn_api_pb2.LogEntry.Severity.Enum
try:
- return self.LOG_LEVEL_MAP[level]
+ return LOG_LEVEL_TO_LOGENTRY_MAP[level]
except KeyError:
return max(
beam_level for python_level,
- beam_level in self.LOG_LEVEL_MAP.items() if python_level <= level)
+ beam_level in LOG_LEVEL_TO_LOGENTRY_MAP.items()
+ if python_level <= level)
def emit(self, record):
# type: (logging.LogRecord) -> None
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py
index 5341f9653bf..769a5f4fe05 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -121,8 +121,9 @@ class SubprocessServer(object):
def log_stdout():
line = self._process.stdout.readline()
while line:
- # Remove newline via rstrip() to not print an empty line
- _LOGGER.info(line.rstrip())
+ # The log obtained from stdout is bytes, decode it into string.
+ # Remove newline via rstrip() to not print an empty line.
+ _LOGGER.info(line.decode(errors='backslashreplace').rstrip())
line = self._process.stdout.readline()
t = threading.Thread(target=log_stdout)