You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2021/11/18 20:09:41 UTC

[arrow] branch master updated: ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new de5aa54  ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader
de5aa54 is described below

commit de5aa544275c9664d17874b1b93de195fb619825
Author: Chris Casola <cc...@factset.com>
AuthorDate: Thu Nov 18 15:06:36 2021 -0500

    ARROW-14717: [Go] Use the ipc.Reader allocator in messageReader
    
    Allocate the body of messages from the memory.Allocator instead
    of using make.
    
    Closes #11712 from chriscasola/ccasola--jira-14717
    
    Authored-by: Chris Casola <cc...@factset.com>
    Signed-off-by: Matthew Topol <mt...@factset.com>
---
 go/arrow/internal/arrdata/ioutil.go  |   1 +
 go/arrow/ipc/file_reader.go          |  40 ++++++++++----
 go/arrow/ipc/message.go              |  19 +++++--
 go/arrow/ipc/message_test.go         | 102 +++++++++++++++++++++++++++++++++++
 go/arrow/ipc/reader.go               |   4 +-
 go/arrow/memory/checked_allocator.go |  21 ++++----
 6 files changed, 162 insertions(+), 25 deletions(-)

diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go
index a7e3e41..c6a2a39 100644
--- a/go/arrow/internal/arrdata/ioutil.go
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -85,6 +85,7 @@ func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem memory.Allocator, sc
 			errs <- fmt.Errorf("could not read record %d: %v", i, err)
 			return
 		}
+		defer rec.Release()
 		if !array.RecordEqual(rec, recs[i]) {
 			errs <- fmt.Errorf("records[%d] differ", i)
 		}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 78d8067..b258199 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -47,6 +47,8 @@ type FileReader struct {
 
 	irec int   // current record index. used for the arrio.Reader interface
 	err  error // last error
+
+	mem memory.Allocator
 }
 
 // NewFileReader opens an Arrow file using the provided reader r.
@@ -59,6 +61,7 @@ func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) {
 			r:      r,
 			fields: make(dictTypeMap),
 			memo:   newMemo(),
+			mem:    cfg.alloc,
 		}
 	)
 
@@ -288,7 +291,7 @@ func (f *FileReader) RecordAt(i int) (array.Record, error) {
 		return nil, xerrors.Errorf("arrow/ipc: message %d is not a Record", i)
 	}
 
-	return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes())), nil
+	return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes()), f.mem), nil
 }
 
 // Read reads the current record from the underlying stream and an error, if any.
@@ -310,7 +313,7 @@ func (f *FileReader) ReadAt(i int64) (array.Record, error) {
 	return f.Record(int(i))
 }
 
-func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record {
+func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker, mem memory.Allocator) array.Record {
 	var (
 		msg   = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
 		md    flatbuf.RecordBatch
@@ -329,6 +332,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) arr
 			meta:  &md,
 			r:     body,
 			codec: codec,
+			mem:   mem,
 		},
 		max: kMaxNestingDepth,
 	}
@@ -336,6 +340,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) arr
 	cols := make([]array.Interface, len(schema.Fields()))
 	for i, field := range schema.Fields() {
 		cols[i] = ctx.loadArray(field.Type)
+		defer cols[i].Release()
 	}
 
 	return array.NewRecord(schema, cols, rows)
@@ -345,6 +350,7 @@ type ipcSource struct {
 	meta  *flatbuf.RecordBatch
 	r     ReadAtSeeker
 	codec decompressor
+	mem   memory.Allocator
 }
 
 func (src *ipcSource) buffer(i int) *memory.Buffer {
@@ -356,10 +362,10 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
 		return memory.NewBufferBytes(nil)
 	}
 
-	var raw []byte
+	raw := memory.NewResizableBuffer(src.mem)
 	if src.codec == nil {
-		raw = make([]byte, buf.Length())
-		_, err := src.r.ReadAt(raw, buf.Offset())
+		raw.Resize(int(buf.Length()))
+		_, err := src.r.ReadAt(raw.Bytes(), buf.Offset())
 		if err != nil {
 			panic(err)
 		}
@@ -375,19 +381,19 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
 		var r io.Reader = sr
 		// check for an uncompressed buffer
 		if int64(uncompressedSize) != -1 {
-			raw = make([]byte, uncompressedSize)
+			raw.Resize(int(uncompressedSize))
 			src.codec.Reset(sr)
 			r = src.codec
 		} else {
-			raw = make([]byte, buf.Length())
+			raw.Resize(int(buf.Length()))
 		}
 
-		if _, err = io.ReadFull(r, raw); err != nil {
+		if _, err = io.ReadFull(r, raw.Bytes()); err != nil {
 			panic(err)
 		}
 	}
 
-	return memory.NewBufferBytes(raw)
+	return raw
 }
 
 func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
@@ -507,6 +513,8 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface
 		buffers = append(buffers, ctx.buffer())
 	}
 
+	defer releaseBuffers(buffers)
+
 	data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
 	defer data.Release()
 
@@ -516,6 +524,7 @@ func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface
 func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
 	field, buffers := ctx.loadCommon(3)
 	buffers = append(buffers, ctx.buffer(), ctx.buffer())
+	defer releaseBuffers(buffers)
 
 	data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
 	defer data.Release()
@@ -526,6 +535,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
 func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface {
 	field, buffers := ctx.loadCommon(2)
 	buffers = append(buffers, ctx.buffer())
+	defer releaseBuffers(buffers)
 
 	data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
 	defer data.Release()
@@ -536,6 +546,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType
 func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
 	field, buffers := ctx.loadCommon(2)
 	buffers = append(buffers, ctx.buffer())
+	defer releaseBuffers(buffers)
 
 	sub := ctx.loadChild(dt.ValueType())
 	defer sub.Release()
@@ -549,6 +560,7 @@ func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
 func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
 	field, buffers := ctx.loadCommon(2)
 	buffers = append(buffers, ctx.buffer())
+	defer releaseBuffers(buffers)
 
 	sub := ctx.loadChild(dt.Elem())
 	defer sub.Release()
@@ -561,6 +573,7 @@ func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
 
 func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface {
 	field, buffers := ctx.loadCommon(1)
+	defer releaseBuffers(buffers)
 
 	sub := ctx.loadChild(dt.Elem())
 	defer sub.Release()
@@ -573,6 +586,7 @@ func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) ar
 
 func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) array.Interface {
 	field, buffers := ctx.loadCommon(1)
+	defer releaseBuffers(buffers)
 
 	arrs := make([]array.Interface, len(dt.Fields()))
 	subs := make([]*array.Data, len(dt.Fields()))
@@ -634,3 +648,11 @@ func readDictionary(meta *memory.Buffer, types dictTypeMap, r ReadAtSeeker) (int
 
 	panic("not implemented")
 }
+
+func releaseBuffers(buffers []*memory.Buffer) {
+	for _, b := range buffers {
+		if b != nil {
+			b.Release()
+		}
+	}
+}
diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go
index 2eda586..acaddc2 100644
--- a/go/arrow/ipc/message.go
+++ b/go/arrow/ipc/message.go
@@ -154,11 +154,18 @@ type messageReader struct {
 
 	refCount int64
 	msg      *Message
+
+	mem memory.Allocator
 }
 
 // NewMessageReader returns a reader that reads messages from an input stream.
-func NewMessageReader(r io.Reader) MessageReader {
-	return &messageReader{r: r, refCount: 1}
+func NewMessageReader(r io.Reader, opts ...Option) MessageReader {
+	cfg := newConfig()
+	for _, opt := range opts {
+		opt(cfg)
+	}
+
+	return &messageReader{r: r, refCount: 1, mem: cfg.alloc}
 }
 
 // Retain increases the reference count by 1.
@@ -224,12 +231,14 @@ func (r *messageReader) Message() (*Message, error) {
 	meta := flatbuf.GetRootAsMessage(buf, 0)
 	bodyLen := meta.BodyLength()
 
-	buf = make([]byte, bodyLen)
-	_, err = io.ReadFull(r.r, buf)
+	body := memory.NewResizableBuffer(r.mem)
+	defer body.Release()
+	body.Resize(int(bodyLen))
+
+	_, err = io.ReadFull(r.r, body.Bytes())
 	if err != nil {
 		return nil, xerrors.Errorf("arrow/ipc: could not read message body: %w", err)
 	}
-	body := memory.NewBufferBytes(buf)
 
 	if r.msg != nil {
 		r.msg.Release()
diff --git a/go/arrow/ipc/message_test.go b/go/arrow/ipc/message_test.go
new file mode 100644
index 0000000..22a6873
--- /dev/null
+++ b/go/arrow/ipc/message_test.go
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc
+
+import (
+	"bytes"
+	"io"
+	"testing"
+
+	"github.com/apache/arrow/go/v7/arrow"
+	"github.com/apache/arrow/go/v7/arrow/array"
+	"github.com/apache/arrow/go/v7/arrow/memory"
+)
+
+func TestMessageReaderBodyInAllocator(t *testing.T) {
+	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer mem.AssertSize(t, 0)
+
+	const numRecords = 3
+	buf := writeRecordsIntoBuffer(t, numRecords)
+	r := NewMessageReader(buf, WithAllocator(mem))
+	defer r.Release()
+
+	msgs := make([]*Message, 0)
+	for {
+		m, err := r.Message()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			t.Fatal(err)
+		}
+		m.Retain()
+		msgs = append(msgs, m)
+	}
+	if len(msgs) != numRecords+1 {
+		t.Fatalf("expected %d messages but got %d", numRecords+1, len(msgs))
+	}
+
+	if mem.CurrentAlloc() <= 0 {
+		t.Fatal("message bodies should have been allocated")
+	}
+
+	for _, m := range msgs {
+		m.Release()
+	}
+}
+
+func writeRecordsIntoBuffer(t *testing.T, numRecords int) *bytes.Buffer {
+	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer mem.AssertSize(t, 0)
+
+	s, recs := getTestRecords(mem, numRecords)
+	buf := new(bytes.Buffer)
+	w := NewWriter(buf, WithAllocator(mem), WithSchema(s))
+	for _, rec := range recs {
+		err := w.Write(rec)
+		rec.Release()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	if err := w.Close(); err != nil {
+		t.Fatal(err)
+	}
+	return buf
+}
+
+func getTestRecords(mem memory.Allocator, numRecords int) (*arrow.Schema, []array.Record) {
+	meta := arrow.NewMetadata([]string{}, []string{})
+	s := arrow.NewSchema([]arrow.Field{
+		{Name: "test-col", Type: arrow.PrimitiveTypes.Int64},
+	}, &meta)
+
+	builder := array.NewRecordBuilder(mem, s)
+	defer builder.Release()
+
+	recs := make([]array.Record, numRecords)
+	for i := 0; i < len(recs); i++ {
+		col := builder.Field(0).(*array.Int64Builder)
+		for i := 0; i < 10; i++ {
+			col.Append(int64(i))
+		}
+		recs[i] = builder.NewRecord()
+	}
+
+	return s, recs
+}
diff --git a/go/arrow/ipc/reader.go b/go/arrow/ipc/reader.go
index 42572b7..1ccfac1 100644
--- a/go/arrow/ipc/reader.go
+++ b/go/arrow/ipc/reader.go
@@ -75,7 +75,7 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (*Reader, error
 
 // NewReader returns a reader that reads records from an input stream.
 func NewReader(r io.Reader, opts ...Option) (*Reader, error) {
-	return NewReaderFromMessageReader(NewMessageReader(r), opts...)
+	return NewReaderFromMessageReader(NewMessageReader(r, opts...), opts...)
 }
 
 // Err returns the last error encountered during the iteration over the
@@ -176,7 +176,7 @@ func (r *Reader) next() bool {
 		return false
 	}
 
-	r.rec = newRecord(r.schema, msg.meta, bytes.NewReader(msg.body.Bytes()))
+	r.rec = newRecord(r.schema, msg.meta, bytes.NewReader(msg.body.Bytes()), r.mem)
 	return true
 }
 
diff --git a/go/arrow/memory/checked_allocator.go b/go/arrow/memory/checked_allocator.go
index da300ae..06be9bd 100644
--- a/go/arrow/memory/checked_allocator.go
+++ b/go/arrow/memory/checked_allocator.go
@@ -21,12 +21,13 @@ import (
 	"runtime"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"unsafe"
 )
 
 type CheckedAllocator struct {
 	mem Allocator
-	sz  int
+	sz  int64
 
 	allocs sync.Map
 }
@@ -35,10 +36,10 @@ func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
 	return &CheckedAllocator{mem: mem}
 }
 
-func (a *CheckedAllocator) CurrentAlloc() int { return a.sz }
+func (a *CheckedAllocator) CurrentAlloc() int { return int(atomic.LoadInt64(&a.sz)) }
 
 func (a *CheckedAllocator) Allocate(size int) []byte {
-	a.sz += size
+	atomic.AddInt64(&a.sz, int64(size))
 	out := a.mem.Allocate(size)
 	if size == 0 {
 		return out
@@ -52,7 +53,7 @@ func (a *CheckedAllocator) Allocate(size int) []byte {
 }
 
 func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
-	a.sz += size - len(b)
+	atomic.AddInt64(&a.sz, int64(size-len(b)))
 
 	oldptr := uintptr(unsafe.Pointer(&b[0]))
 	out := a.mem.Reallocate(size, b)
@@ -69,7 +70,7 @@ func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
 }
 
 func (a *CheckedAllocator) Free(b []byte) {
-	a.sz -= len(b)
+	atomic.AddInt64(&a.sz, int64(len(b)*-1))
 	defer a.mem.Free(b)
 
 	if len(b) == 0 {
@@ -127,7 +128,7 @@ func (a *CheckedAllocator) AssertSize(t TestingT, sz int) {
 		return true
 	})
 
-	if a.sz != sz {
+	if int(atomic.LoadInt64(&a.sz)) != sz {
 		t.Helper()
 		t.Errorf("invalid memory size exp=%d, got=%d", sz, a.sz)
 	}
@@ -139,13 +140,15 @@ type CheckedAllocatorScope struct {
 }
 
 func NewCheckedAllocatorScope(alloc *CheckedAllocator) *CheckedAllocatorScope {
-	return &CheckedAllocatorScope{alloc: alloc, sz: alloc.sz}
+	sz := atomic.LoadInt64(&alloc.sz)
+	return &CheckedAllocatorScope{alloc: alloc, sz: int(sz)}
 }
 
 func (c *CheckedAllocatorScope) CheckSize(t TestingT) {
-	if c.sz != c.alloc.sz {
+	sz := int(atomic.LoadInt64(&c.alloc.sz))
+	if c.sz != sz {
 		t.Helper()
-		t.Errorf("invalid memory size exp=%d, got=%d", c.sz, c.alloc.sz)
+		t.Errorf("invalid memory size exp=%d, got=%d", c.sz, sz)
 	}
 }