You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2021/11/18 20:09:41 UTC
[arrow] branch master updated: ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new de5aa54 ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader
de5aa54 is described below
commit de5aa544275c9664d17874b1b93de195fb619825
Author: Chris Casola <cc...@factset.com>
AuthorDate: Thu Nov 18 15:06:36 2021 -0500
ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader
Allocate the body of messages from the memory.Allocator instead
of using make.
Closes #11712 from chriscasola/ccasola--jira-14717
Authored-by: Chris Casola <cc...@factset.com>
Signed-off-by: Matthew Topol <mt...@factset.com>
---
go/arrow/internal/arrdata/ioutil.go | 1 +
go/arrow/ipc/file_reader.go | 40 ++++++++++----
go/arrow/ipc/message.go | 19 +++++--
go/arrow/ipc/message_test.go | 102 +++++++++++++++++++++++++++++++++++
go/arrow/ipc/reader.go | 4 +-
go/arrow/memory/checked_allocator.go | 21 ++++----
6 files changed, 162 insertions(+), 25 deletions(-)
diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go
index a7e3e41..c6a2a39 100644
--- a/go/arrow/internal/arrdata/ioutil.go
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -85,6 +85,7 @@ func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem memory.Allocator, sc
errs <- fmt.Errorf("could not read record %d: %v", i, err)
return
}
+ defer rec.Release()
if !array.RecordEqual(rec, recs[i]) {
errs <- fmt.Errorf("records[%d] differ", i)
}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 78d8067..b258199 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -47,6 +47,8 @@ type FileReader struct {
irec int // current record index. used for the arrio.Reader interface
err error // last error
+
+ mem memory.Allocator
}
// NewFileReader opens an Arrow file using the provided reader r.
@@ -59,6 +61,7 @@ func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) {
r: r,
fields: make(dictTypeMap),
memo: newMemo(),
+ mem: cfg.alloc,
}
)
@@ -288,7 +291,7 @@ func (f *FileReader) RecordAt(i int) (array.Record, error) {
return nil, xerrors.Errorf("arrow/ipc: message %d is not a Record", i)
}
- return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes())), nil
+ return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes()), f.mem), nil
}
// Read reads the current record from the underlying stream and an error, if any.
@@ -310,7 +313,7 @@ func (f *FileReader) ReadAt(i int64) (array.Record, error) {
return f.Record(int(i))
}
-func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record {
+func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker, mem memory.Allocator) array.Record {
var (
msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
md flatbuf.RecordBatch
@@ -329,6 +332,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) arr
meta: &md,
r: body,
codec: codec,
+ mem: mem,
},
max: kMaxNestingDepth,
}
@@ -336,6 +340,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) arr
cols := make([]array.Interface, len(schema.Fields()))
for i, field := range schema.Fields() {
cols[i] = ctx.loadArray(field.Type)
+ defer cols[i].Release()
}
return array.NewRecord(schema, cols, rows)
@@ -345,6 +350,7 @@ type ipcSource struct {
meta *flatbuf.RecordBatch
r ReadAtSeeker
codec decompressor
+ mem memory.Allocator
}
func (src *ipcSource) buffer(i int) *memory.Buffer {
@@ -356,10 +362,10 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
return memory.NewBufferBytes(nil)
}
- var raw []byte
+ raw := memory.NewResizableBuffer(src.mem)
if src.codec == nil {
- raw = make([]byte, buf.Length())
- _, err := src.r.ReadAt(raw, buf.Offset())
+ raw.Resize(int(buf.Length()))
+ _, err := src.r.ReadAt(raw.Bytes(), buf.Offset())
if err != nil {
panic(err)
}
@@ -375,19 +381,19 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
var r io.Reader = sr
// check for an uncompressed buffer
if int64(uncompressedSize) != -1 {
- raw = make([]byte, uncompressedSize)
+ raw.Resize(int(uncompressedSize))
src.codec.Reset(sr)
r = src.codec
} else {
- raw = make([]byte, buf.Length())
+ raw.Resize(int(buf.Length()))
}
- if _, err = io.ReadFull(r, raw); err != nil {
+ if _, err = io.ReadFull(r, raw.Bytes()); err != nil {
panic(err)
}
}
- return memory.NewBufferBytes(raw)
+ return raw
}
func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
@@ -507,6 +513,8 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface
buffers = append(buffers, ctx.buffer())
}
+ defer releaseBuffers(buffers)
+
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
@@ -516,6 +524,7 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface
func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
field, buffers := ctx.loadCommon(3)
buffers = append(buffers, ctx.buffer(), ctx.buffer())
+ defer releaseBuffers(buffers)
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
@@ -526,6 +535,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface {
field, buffers := ctx.loadCommon(2)
buffers = append(buffers, ctx.buffer())
+ defer releaseBuffers(buffers)
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
@@ -536,6 +546,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType
func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
field, buffers := ctx.loadCommon(2)
buffers = append(buffers, ctx.buffer())
+ defer releaseBuffers(buffers)
sub := ctx.loadChild(dt.ValueType())
defer sub.Release()
@@ -549,6 +560,7 @@ func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
field, buffers := ctx.loadCommon(2)
buffers = append(buffers, ctx.buffer())
+ defer releaseBuffers(buffers)
sub := ctx.loadChild(dt.Elem())
defer sub.Release()
@@ -561,6 +573,7 @@ func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface {
field, buffers := ctx.loadCommon(1)
+ defer releaseBuffers(buffers)
sub := ctx.loadChild(dt.Elem())
defer sub.Release()
@@ -573,6 +586,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) ar
func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) array.Interface {
field, buffers := ctx.loadCommon(1)
+ defer releaseBuffers(buffers)
arrs := make([]array.Interface, len(dt.Fields()))
subs := make([]*array.Data, len(dt.Fields()))
@@ -634,3 +648,11 @@ func readDictionary(meta *memory.Buffer, types dictTypeMap, r ReadAtSeeker) (int
panic("not implemented")
}
+
+func releaseBuffers(buffers []*memory.Buffer) {
+ for _, b := range buffers {
+ if b != nil {
+ b.Release()
+ }
+ }
+}
diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go
index 2eda586..acaddc2 100644
--- a/go/arrow/ipc/message.go
+++ b/go/arrow/ipc/message.go
@@ -154,11 +154,18 @@ type messageReader struct {
refCount int64
msg *Message
+
+ mem memory.Allocator
}
// NewMessageReader returns a reader that reads messages from an input stream.
-func NewMessageReader(r io.Reader) MessageReader {
- return &messageReader{r: r, refCount: 1}
+func NewMessageReader(r io.Reader, opts ...Option) MessageReader {
+ cfg := newConfig()
+ for _, opt := range opts {
+ opt(cfg)
+ }
+
+ return &messageReader{r: r, refCount: 1, mem: cfg.alloc}
}
// Retain increases the reference count by 1.
@@ -224,12 +231,14 @@ func (r *messageReader) Message() (*Message, error) {
meta := flatbuf.GetRootAsMessage(buf, 0)
bodyLen := meta.BodyLength()
- buf = make([]byte, bodyLen)
- _, err = io.ReadFull(r.r, buf)
+ body := memory.NewResizableBuffer(r.mem)
+ defer body.Release()
+ body.Resize(int(bodyLen))
+
+ _, err = io.ReadFull(r.r, body.Bytes())
if err != nil {
return nil, xerrors.Errorf("arrow/ipc: could not read message body: %w", err)
}
- body := memory.NewBufferBytes(buf)
if r.msg != nil {
r.msg.Release()
diff --git a/go/arrow/ipc/message_test.go b/go/arrow/ipc/message_test.go
new file mode 100644
index 0000000..22a6873
--- /dev/null
+++ b/go/arrow/ipc/message_test.go
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc
+
+import (
+ "bytes"
+ "io"
+ "testing"
+
+ "github.com/apache/arrow/go/v7/arrow"
+ "github.com/apache/arrow/go/v7/arrow/array"
+ "github.com/apache/arrow/go/v7/arrow/memory"
+)
+
+func TestMessageReaderBodyInAllocator(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ const numRecords = 3
+ buf := writeRecordsIntoBuffer(t, numRecords)
+ r := NewMessageReader(buf, WithAllocator(mem))
+ defer r.Release()
+
+ msgs := make([]*Message, 0)
+ for {
+ m, err := r.Message()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+ m.Retain()
+ msgs = append(msgs, m)
+ }
+ if len(msgs) != numRecords+1 {
+ t.Fatalf("expected %d messages but got %d", numRecords+1, len(msgs))
+ }
+
+ if mem.CurrentAlloc() <= 0 {
+ t.Fatal("message bodies should have been allocated")
+ }
+
+ for _, m := range msgs {
+ m.Release()
+ }
+}
+
+func writeRecordsIntoBuffer(t *testing.T, numRecords int) *bytes.Buffer {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ s, recs := getTestRecords(mem, numRecords)
+ buf := new(bytes.Buffer)
+ w := NewWriter(buf, WithAllocator(mem), WithSchema(s))
+ for _, rec := range recs {
+ err := w.Write(rec)
+ rec.Release()
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ if err := w.Close(); err != nil {
+ t.Fatal(err)
+ }
+ return buf
+}
+
+func getTestRecords(mem memory.Allocator, numRecords int) (*arrow.Schema, []array.Record) {
+ meta := arrow.NewMetadata([]string{}, []string{})
+ s := arrow.NewSchema([]arrow.Field{
+ {Name: "test-col", Type: arrow.PrimitiveTypes.Int64},
+ }, &meta)
+
+ builder := array.NewRecordBuilder(mem, s)
+ defer builder.Release()
+
+ recs := make([]array.Record, numRecords)
+ for i := 0; i < len(recs); i++ {
+ col := builder.Field(0).(*array.Int64Builder)
+ for i := 0; i < 10; i++ {
+ col.Append(int64(i))
+ }
+ recs[i] = builder.NewRecord()
+ }
+
+ return s, recs
+}
diff --git a/go/arrow/ipc/reader.go b/go/arrow/ipc/reader.go
index 42572b7..1ccfac1 100644
--- a/go/arrow/ipc/reader.go
+++ b/go/arrow/ipc/reader.go
@@ -75,7 +75,7 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (*Reader, error
// NewReader returns a reader that reads records from an input stream.
func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
- return NewReaderFromMessageReader(NewMessageReader(r), opts...)
+ return NewReaderFromMessageReader(NewMessageReader(r, opts...), opts...)
}
// Err returns the last error encountered during the iteration over the
@@ -176,7 +176,7 @@ func (r *Reader) next() bool {
return false
}
- r.rec = newRecord(r.schema, msg.meta, bytes.NewReader(msg.body.Bytes()))
+ r.rec = newRecord(r.schema, msg.meta, bytes.NewReader(msg.body.Bytes()), r.mem)
return true
}
diff --git a/go/arrow/memory/checked_allocator.go b/go/arrow/memory/checked_allocator.go
index da300ae..06be9bd 100644
--- a/go/arrow/memory/checked_allocator.go
+++ b/go/arrow/memory/checked_allocator.go
@@ -21,12 +21,13 @@ import (
"runtime"
"strconv"
"sync"
+ "sync/atomic"
"unsafe"
)
type CheckedAllocator struct {
mem Allocator
- sz int
+ sz int64
allocs sync.Map
}
@@ -35,10 +36,10 @@ func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
return &CheckedAllocator{mem: mem}
}
-func (a *CheckedAllocator) CurrentAlloc() int { return a.sz }
+func (a *CheckedAllocator) CurrentAlloc() int { return int(atomic.LoadInt64(&a.sz)) }
func (a *CheckedAllocator) Allocate(size int) []byte {
- a.sz += size
+ atomic.AddInt64(&a.sz, int64(size))
out := a.mem.Allocate(size)
if size == 0 {
return out
@@ -52,7 +53,7 @@ func (a *CheckedAllocator) Allocate(size int) []byte {
}
func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
- a.sz += size - len(b)
+ atomic.AddInt64(&a.sz, int64(size-len(b)))
oldptr := uintptr(unsafe.Pointer(&b[0]))
out := a.mem.Reallocate(size, b)
@@ -69,7 +70,7 @@ func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
}
func (a *CheckedAllocator) Free(b []byte) {
- a.sz -= len(b)
+ atomic.AddInt64(&a.sz, int64(len(b)*-1))
defer a.mem.Free(b)
if len(b) == 0 {
@@ -127,7 +128,7 @@ func (a *CheckedAllocator) AssertSize(t TestingT, sz int) {
return true
})
- if a.sz != sz {
+ if int(atomic.LoadInt64(&a.sz)) != sz {
t.Helper()
t.Errorf("invalid memory size exp=%d, got=%d", sz, a.sz)
}
@@ -139,13 +140,15 @@ type CheckedAllocatorScope struct {
}
func NewCheckedAllocatorScope(alloc *CheckedAllocator) *CheckedAllocatorScope {
- return &CheckedAllocatorScope{alloc: alloc, sz: alloc.sz}
+ sz := atomic.LoadInt64(&alloc.sz)
+ return &CheckedAllocatorScope{alloc: alloc, sz: int(sz)}
}
func (c *CheckedAllocatorScope) CheckSize(t TestingT) {
- if c.sz != c.alloc.sz {
+ sz := int(atomic.LoadInt64(&c.alloc.sz))
+ if c.sz != sz {
t.Helper()
- t.Errorf("invalid memory size exp=%d, got=%d", c.sz, c.alloc.sz)
+ t.Errorf("invalid memory size exp=%d, got=%d", c.sz, sz)
}
}