You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/08/31 19:19:23 UTC
[beam] branch master updated: Implement initial version of the BufferedLogger (#28202)
This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 31c6af443ca Implement initial version of the BufferedLogger (#28202)
31c6af443ca is described below
commit 31c6af443ca758de35a959efa6d705abfe3b508c
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Aug 31 15:19:16 2023 -0400
Implement initial version of the BufferedLogger (#28202)
* Implement initial version of the BufferedLogger
* Use non-pointer for builder, remove constructor
* Restore constructor
---
sdks/go/container/tools/buffered_logging.go | 78 +++++++++++
sdks/go/container/tools/buffered_logging_test.go | 169 +++++++++++++++++++++++
2 files changed, 247 insertions(+)
diff --git a/sdks/go/container/tools/buffered_logging.go b/sdks/go/container/tools/buffered_logging.go
new file mode 100644
index 00000000000..5a810dbfdf1
--- /dev/null
+++ b/sdks/go/container/tools/buffered_logging.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+ "context"
+ "os"
+ "strings"
+)
+
+const INITIAL_LOG_SIZE int = 255
+
+// BufferedLogger is a wrapper around the FnAPI logging client meant to be used
+// in place of stdout and stderr in bootloader subprocesses. Not intended for
+// Beam end users.
+type BufferedLogger struct {
+ logger *Logger
+ builder strings.Builder
+ logs []string
+}
+
+// NewBufferedLogger returns a new BufferedLogger type by reference.
+func NewBufferedLogger(logger *Logger) *BufferedLogger {
+ return &BufferedLogger{logger: logger}
+}
+
+// Write implements the io.Writer interface, converting input to a string
+// and storing it in the BufferedLogger's buffer. If a logger is not provided,
+// the output is sent directly to os.Stderr.
+func (b *BufferedLogger) Write(p []byte) (int, error) {
+ if b.logger == nil {
+ return os.Stderr.Write(p)
+ }
+ n, err := b.builder.Write(p)
+ if b.logs == nil {
+ b.logs = make([]string, 0, INITIAL_LOG_SIZE)
+ }
+ b.logs = append(b.logs, b.builder.String())
+ b.builder.Reset()
+ return n, err
+}
+
+// FlushAtError flushes the contents of the buffer to the logging
+// service at Error.
+func (b *BufferedLogger) FlushAtError(ctx context.Context) {
+ if b.logger == nil {
+ return
+ }
+ for _, message := range b.logs {
+ b.logger.Errorf(ctx, message)
+ }
+ b.logs = nil
+}
+
+// FlushAtDebug flushes the contents of the buffer to the logging
+// service at Debug.
+func (b *BufferedLogger) FlushAtDebug(ctx context.Context) {
+ if b.logger == nil {
+ return
+ }
+ for _, message := range b.logs {
+ b.logger.Printf(ctx, message)
+ }
+ b.logs = nil
+}
diff --git a/sdks/go/container/tools/buffered_logging_test.go b/sdks/go/container/tools/buffered_logging_test.go
new file mode 100644
index 00000000000..8feef7b413d
--- /dev/null
+++ b/sdks/go/container/tools/buffered_logging_test.go
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+ "context"
+ "testing"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+)
+
+func TestBufferedLogger(t *testing.T) {
+ ctx := context.Background()
+
+ t.Run("write", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ bl := NewBufferedLogger(l)
+
+ message := []byte("test message")
+ n, err := bl.Write(message)
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(message); got != want {
+ t.Errorf("got %d bytes written, want %d", got, want)
+ }
+ if got, want := bl.logs[0], "test message"; got != want {
+ t.Errorf("got message %q, want %q", got, want)
+ }
+ })
+
+ t.Run("flush single message", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ bl := NewBufferedLogger(l)
+
+ message := []byte("test message")
+ n, err := bl.Write(message)
+
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(message); got != want {
+ t.Errorf("got %d bytes written, want %d", got, want)
+ }
+
+ bl.FlushAtDebug(ctx)
+
+ received := catcher.msgs[0].GetLogEntries()[0]
+
+ if got, want := received.Message, "test message"; got != want {
+ t.Errorf("got message %q, want %q", got, want)
+ }
+
+ if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
+ t.Errorf("got severity %v, want %v", got, want)
+ }
+ })
+
+ t.Run("flush multiple messages", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ bl := NewBufferedLogger(l)
+
+ messages := []string{"foo", "bar", "baz"}
+
+ for _, message := range messages {
+ messBytes := []byte(message)
+ n, err := bl.Write(messBytes)
+
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(messBytes); got != want {
+ t.Errorf("got %d bytes written, want %d", got, want)
+ }
+ }
+
+ bl.FlushAtDebug(ctx)
+
+ received := catcher.msgs[0].GetLogEntries()
+
+ for i, message := range received {
+ if got, want := message.Message, messages[i]; got != want {
+ t.Errorf("got message %q, want %q", got, want)
+ }
+
+ if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
+ t.Errorf("got severity %v, want %v", got, want)
+ }
+ }
+ })
+
+ t.Run("flush single message at error", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ bl := NewBufferedLogger(l)
+
+ message := []byte("test error")
+ n, err := bl.Write(message)
+
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(message); got != want {
+ t.Errorf("got %d bytes written, want %d", got, want)
+ }
+
+ bl.FlushAtError(ctx)
+
+ received := catcher.msgs[0].GetLogEntries()[0]
+
+ if got, want := received.Message, "test error"; got != want {
+ t.Errorf("got message %q, want %q", got, want)
+ }
+
+ if got, want := received.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
+ t.Errorf("got severity %v, want %v", got, want)
+ }
+ })
+
+ t.Run("flush multiple messages at error", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ bl := NewBufferedLogger(l)
+
+ messages := []string{"foo", "bar", "baz"}
+
+ for _, message := range messages {
+ messBytes := []byte(message)
+ n, err := bl.Write(messBytes)
+
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(messBytes); got != want {
+ t.Errorf("got %d bytes written, want %d", got, want)
+ }
+ }
+
+ bl.FlushAtError(ctx)
+
+ received := catcher.msgs[0].GetLogEntries()
+
+ for i, message := range received {
+ if got, want := message.Message, messages[i]; got != want {
+ t.Errorf("got message %q, want %q", got, want)
+ }
+
+ if got, want := message.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
+ t.Errorf("got severity %v, want %v", got, want)
+ }
+ }
+ })
+}