You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/08/31 19:19:23 UTC

[beam] branch master updated: Implement initial version of the BufferedLogger (#28202)

This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 31c6af443ca Implement initial version of the BufferedLogger (#28202)
31c6af443ca is described below

commit 31c6af443ca758de35a959efa6d705abfe3b508c
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Aug 31 15:19:16 2023 -0400

    Implement initial version of the BufferedLogger (#28202)
    
    * Implement initial version of the BufferedLogger
    
    * Use non-pointer for builder, remove constructor
    
    * Restore constructor
---
 sdks/go/container/tools/buffered_logging.go      |  78 +++++++++++
 sdks/go/container/tools/buffered_logging_test.go | 169 +++++++++++++++++++++++
 2 files changed, 247 insertions(+)

diff --git a/sdks/go/container/tools/buffered_logging.go b/sdks/go/container/tools/buffered_logging.go
new file mode 100644
index 00000000000..5a810dbfdf1
--- /dev/null
+++ b/sdks/go/container/tools/buffered_logging.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+	"context"
+	"os"
+	"strings"
+)
+
+const INITIAL_LOG_SIZE int = 255
+
+// BufferedLogger is a wrapper around the FnAPI logging client meant to be used
+// in place of stdout and stderr in bootloader subprocesses. Not intended for
+// Beam end users.
+type BufferedLogger struct {
+	logger  *Logger
+	builder strings.Builder
+	logs    []string
+}
+
+// NewBufferedLogger returns a new BufferedLogger type by reference.
+func NewBufferedLogger(logger *Logger) *BufferedLogger {
+	return &BufferedLogger{logger: logger}
+}
+
+// Write implements the io.Writer interface, converting input to a string
+// and storing it in the BufferedLogger's buffer. If a logger is not provided,
+// the output is sent directly to os.Stderr.
+func (b *BufferedLogger) Write(p []byte) (int, error) {
+	if b.logger == nil {
+		return os.Stderr.Write(p)
+	}
+	n, err := b.builder.Write(p)
+	if b.logs == nil {
+		b.logs = make([]string, 0, INITIAL_LOG_SIZE)
+	}
+	b.logs = append(b.logs, b.builder.String())
+	b.builder.Reset()
+	return n, err
+}
+
+// FlushAtError flushes the contents of the buffer to the logging
+// service at Error.
+func (b *BufferedLogger) FlushAtError(ctx context.Context) {
+	if b.logger == nil {
+		return
+	}
+	for _, message := range b.logs {
+		b.logger.Errorf(ctx, message)
+	}
+	b.logs = nil
+}
+
+// FlushAtDebug flushes the contents of the buffer to the logging
+// service at Debug.
+func (b *BufferedLogger) FlushAtDebug(ctx context.Context) {
+	if b.logger == nil {
+		return
+	}
+	for _, message := range b.logs {
+		b.logger.Printf(ctx, message)
+	}
+	b.logs = nil
+}
diff --git a/sdks/go/container/tools/buffered_logging_test.go b/sdks/go/container/tools/buffered_logging_test.go
new file mode 100644
index 00000000000..8feef7b413d
--- /dev/null
+++ b/sdks/go/container/tools/buffered_logging_test.go
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+	"context"
+	"testing"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+)
+
+func TestBufferedLogger(t *testing.T) {
+	ctx := context.Background()
+
+	t.Run("write", func(t *testing.T) {
+		catcher := &logCatcher{}
+		l := &Logger{client: catcher}
+		bl := NewBufferedLogger(l)
+
+		message := []byte("test message")
+		n, err := bl.Write(message)
+		if err != nil {
+			t.Errorf("got error %v", err)
+		}
+		if got, want := n, len(message); got != want {
+			t.Errorf("got %d bytes written, want %d", got, want)
+		}
+		if got, want := bl.logs[0], "test message"; got != want {
+			t.Errorf("got message %q, want %q", got, want)
+		}
+	})
+
+	t.Run("flush single message", func(t *testing.T) {
+		catcher := &logCatcher{}
+		l := &Logger{client: catcher}
+		bl := NewBufferedLogger(l)
+
+		message := []byte("test message")
+		n, err := bl.Write(message)
+
+		if err != nil {
+			t.Errorf("got error %v", err)
+		}
+		if got, want := n, len(message); got != want {
+			t.Errorf("got %d bytes written, want %d", got, want)
+		}
+
+		bl.FlushAtDebug(ctx)
+
+		received := catcher.msgs[0].GetLogEntries()[0]
+
+		if got, want := received.Message, "test message"; got != want {
+			t.Errorf("got message %q, want %q", got, want)
+		}
+
+		if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
+			t.Errorf("got severity %v, want %v", got, want)
+		}
+	})
+
+	t.Run("flush multiple messages", func(t *testing.T) {
+		catcher := &logCatcher{}
+		l := &Logger{client: catcher}
+		bl := NewBufferedLogger(l)
+
+		messages := []string{"foo", "bar", "baz"}
+
+		for _, message := range messages {
+			messBytes := []byte(message)
+			n, err := bl.Write(messBytes)
+
+			if err != nil {
+				t.Errorf("got error %v", err)
+			}
+			if got, want := n, len(messBytes); got != want {
+				t.Errorf("got %d bytes written, want %d", got, want)
+			}
+		}
+
+		bl.FlushAtDebug(ctx)
+
+		received := catcher.msgs[0].GetLogEntries()
+
+		for i, message := range received {
+			if got, want := message.Message, messages[i]; got != want {
+				t.Errorf("got message %q, want %q", got, want)
+			}
+
+			if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
+				t.Errorf("got severity %v, want %v", got, want)
+			}
+		}
+	})
+
+	t.Run("flush single message at error", func(t *testing.T) {
+		catcher := &logCatcher{}
+		l := &Logger{client: catcher}
+		bl := NewBufferedLogger(l)
+
+		message := []byte("test error")
+		n, err := bl.Write(message)
+
+		if err != nil {
+			t.Errorf("got error %v", err)
+		}
+		if got, want := n, len(message); got != want {
+			t.Errorf("got %d bytes written, want %d", got, want)
+		}
+
+		bl.FlushAtError(ctx)
+
+		received := catcher.msgs[0].GetLogEntries()[0]
+
+		if got, want := received.Message, "test error"; got != want {
+			t.Errorf("got message %q, want %q", got, want)
+		}
+
+		if got, want := received.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
+			t.Errorf("got severity %v, want %v", got, want)
+		}
+	})
+
+	t.Run("flush multiple messages at error", func(t *testing.T) {
+		catcher := &logCatcher{}
+		l := &Logger{client: catcher}
+		bl := NewBufferedLogger(l)
+
+		messages := []string{"foo", "bar", "baz"}
+
+		for _, message := range messages {
+			messBytes := []byte(message)
+			n, err := bl.Write(messBytes)
+
+			if err != nil {
+				t.Errorf("got error %v", err)
+			}
+			if got, want := n, len(messBytes); got != want {
+				t.Errorf("got %d bytes written, want %d", got, want)
+			}
+		}
+
+		bl.FlushAtError(ctx)
+
+		received := catcher.msgs[0].GetLogEntries()
+
+		for i, message := range received {
+			if got, want := message.Message, messages[i]; got != want {
+				t.Errorf("got message %q, want %q", got, want)
+			}
+
+			if got, want := message.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
+				t.Errorf("got severity %v, want %v", got, want)
+			}
+		}
+	})
+}