You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/10/21 02:18:07 UTC

[beam] branch master updated: [BEAM-13082] Re-use dataWriter buffer. (#15762)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c8939b  [BEAM-13082] Re-use dataWriter buffer. (#15762)
9c8939b is described below

commit 9c8939b8011f9003da77df1a10978211d23d3fde
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Wed Oct 20 19:16:38 2021 -0700

    [BEAM-13082] Re-use dataWriter buffer. (#15762)
---
 sdks/go/pkg/beam/core/runtime/harness/datamgr.go   | 11 +++--
 .../pkg/beam/core/runtime/harness/datamgr_test.go  | 55 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index 3750621..4aa7424 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -502,6 +502,10 @@ func (w *dataWriter) Close() error {
 	if err != nil {
 		return errors.Wrapf(err, "dataWriter[%v;%v].Close: error flushing buffer of length %d", w.id, w.ch.id, l)
 	}
+	// TODO(BEAM-13082): Consider a sync.Pool to reuse < 64MB buffers.
+	// The dataWriter won't be reused, but may be referenced elsewhere.
+	// Drop the buffer to let it be GC'd.
+	w.buf = nil
 
 	// Now acquire the locks since we're sending.
 	w.ch.mu.Lock()
@@ -523,12 +527,11 @@ func (w *dataWriter) Close() error {
 const largeBufferNotificationThreshold = 1024 * 1024 * 1024 // 1GB
 
 func (w *dataWriter) Flush() error {
-	w.ch.mu.Lock()
-	defer w.ch.mu.Unlock()
-
 	if w.buf == nil {
 		return nil
 	}
+	w.ch.mu.Lock()
+	defer w.ch.mu.Unlock()
 
 	msg := &fnpb.Elements{
 		Data: []*fnpb.Elements_Data{
@@ -542,7 +545,7 @@ func (w *dataWriter) Flush() error {
 	if l := len(w.buf); l > largeBufferNotificationThreshold {
 		log.Infof(context.TODO(), "dataWriter[%v;%v].Flush flushed large buffer of length %d", w.id, w.ch.id, l)
 	}
-	w.buf = nil
+	w.buf = w.buf[:0]
 	return w.send(msg)
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
index 3ee15ed..bb65f7e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
@@ -16,6 +16,7 @@
 package harness
 
 import (
+	"bytes"
 	"context"
 	"fmt"
 	"io"
@@ -299,3 +300,57 @@ func TestDataChannelTerminate_Writes(t *testing.T) {
 		})
 	}
 }
+
+type noopDataClient struct {
+}
+
+func (*noopDataClient) Recv() (*fnpb.Elements, error) {
+	return nil, nil
+}
+
+func (*noopDataClient) Send(*fnpb.Elements) error {
+	return nil
+}
+
+func BenchmarkDataWriter(b *testing.B) {
+	fourB := []byte{42, 23, 78, 159}
+	sixteenB := bytes.Repeat(fourB, 4)
+	oneKiloB := bytes.Repeat(sixteenB, 64)
+	oneMegaB := bytes.Repeat(oneKiloB, 1024)
+	benches := []struct {
+		name string
+		data []byte
+	}{
+		{"4B", fourB},
+		{"16B", sixteenB},
+		{"1KB", oneKiloB},
+		{"4KB", bytes.Repeat(oneKiloB, 4)},
+		{"100KB", bytes.Repeat(oneKiloB, 100)},
+		{"1MB", oneMegaB},
+		{"10MB", bytes.Repeat(oneMegaB, 10)},
+		{"100MB", bytes.Repeat(oneMegaB, 100)},
+		{"256MB", bytes.Repeat(oneMegaB, 256)},
+	}
+	for _, bench := range benches {
+		b.Run(bench.name, func(b *testing.B) {
+			ndc := &noopDataClient{}
+			dc := &DataChannel{
+				id:      "dcid",
+				client:  ndc,
+				writers: map[instructionID]map[string]*dataWriter{},
+			}
+			w := dataWriter{
+				ch: dc,
+				id: clientID{
+					ptransformID: "pid",
+					instID:       instructionID("instID"),
+				},
+			}
+
+			for i := 0; i < b.N; i++ {
+				w.Write(bench.data)
+			}
+			w.Close()
+		})
+	}
+}