You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/10/21 02:18:07 UTC
[beam] branch master updated: [BEAM-13082] Re-use dataWriter
buffer. (#15762)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9c8939b [BEAM-13082] Re-use dataWriter buffer. (#15762)
9c8939b is described below
commit 9c8939b8011f9003da77df1a10978211d23d3fde
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Wed Oct 20 19:16:38 2021 -0700
[BEAM-13082] Re-use dataWriter buffer. (#15762)
---
sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 11 +++--
.../pkg/beam/core/runtime/harness/datamgr_test.go | 55 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 4 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index 3750621..4aa7424 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -502,6 +502,10 @@ func (w *dataWriter) Close() error {
if err != nil {
return errors.Wrapf(err, "dataWriter[%v;%v].Close: error flushing buffer of length %d", w.id, w.ch.id, l)
}
+ // TODO(BEAM-13082): Consider a sync.Pool to reuse < 64MB buffers.
+ // The dataWriter won't be reused, but may be referenced elsewhere.
+ // Drop the buffer to let it be GC'd.
+ w.buf = nil
// Now acquire the locks since we're sending.
w.ch.mu.Lock()
@@ -523,12 +527,11 @@ func (w *dataWriter) Close() error {
const largeBufferNotificationThreshold = 1024 * 1024 * 1024 // 1GB
func (w *dataWriter) Flush() error {
- w.ch.mu.Lock()
- defer w.ch.mu.Unlock()
-
if w.buf == nil {
return nil
}
+ w.ch.mu.Lock()
+ defer w.ch.mu.Unlock()
msg := &fnpb.Elements{
Data: []*fnpb.Elements_Data{
@@ -542,7 +545,7 @@ func (w *dataWriter) Flush() error {
if l := len(w.buf); l > largeBufferNotificationThreshold {
log.Infof(context.TODO(), "dataWriter[%v;%v].Flush flushed large buffer of length %d", w.id, w.ch.id, l)
}
- w.buf = nil
+ w.buf = w.buf[:0]
return w.send(msg)
}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
index 3ee15ed..bb65f7e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
@@ -16,6 +16,7 @@
package harness
import (
+ "bytes"
"context"
"fmt"
"io"
@@ -299,3 +300,57 @@ func TestDataChannelTerminate_Writes(t *testing.T) {
})
}
}
+
+type noopDataClient struct {
+}
+
+func (*noopDataClient) Recv() (*fnpb.Elements, error) {
+ return nil, nil
+}
+
+func (*noopDataClient) Send(*fnpb.Elements) error {
+ return nil
+}
+
+func BenchmarkDataWriter(b *testing.B) {
+ fourB := []byte{42, 23, 78, 159}
+ sixteenB := bytes.Repeat(fourB, 4)
+ oneKiloB := bytes.Repeat(sixteenB, 64)
+ oneMegaB := bytes.Repeat(oneKiloB, 1024)
+ benches := []struct {
+ name string
+ data []byte
+ }{
+ {"4B", fourB},
+ {"16B", sixteenB},
+ {"1KB", oneKiloB},
+ {"4KB", bytes.Repeat(oneKiloB, 4)},
+ {"100KB", bytes.Repeat(oneKiloB, 100)},
+ {"1MB", oneMegaB},
+ {"10MB", bytes.Repeat(oneMegaB, 10)},
+ {"100MB", bytes.Repeat(oneMegaB, 100)},
+ {"256MB", bytes.Repeat(oneMegaB, 256)},
+ }
+ for _, bench := range benches {
+ b.Run(bench.name, func(b *testing.B) {
+ ndc := &noopDataClient{}
+ dc := &DataChannel{
+ id: "dcid",
+ client: ndc,
+ writers: map[instructionID]map[string]*dataWriter{},
+ }
+ w := dataWriter{
+ ch: dc,
+ id: clientID{
+ ptransformID: "pid",
+ instID: instructionID("instID"),
+ },
+ }
+
+ for i := 0; i < b.N; i++ {
+ w.Write(bench.data)
+ }
+ w.Close()
+ })
+ }
+}