You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/08/16 18:25:05 UTC
[arrow] branch master updated: ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887)
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ee3a8d8a92 ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887)
ee3a8d8a92 is described below
commit ee3a8d8a92e5d01c3c3846fb400c5c523c625642
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Tue Aug 16 14:24:59 2022 -0400
ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887)
Authored-by: Matt Topol <zo...@gmail.com>
Signed-off-by: Matt Topol <zo...@gmail.com>
---
go/parquet/file/column_reader.go | 18 +++++++++++++-----
go/parquet/file/column_reader_test.go | 19 ++++++++++++++++++-
go/parquet/file/column_writer_test.go | 20 +++++++++++++++++++-
go/parquet/file/file_reader.go | 23 +++++++++++++++++++++++
go/parquet/file/record_reader.go | 25 +++++++++++++------------
go/parquet/file/row_group_reader.go | 5 ++++-
go/parquet/pqarrow/column_readers.go | 5 +++--
go/parquet/pqarrow/file_reader.go | 6 +++---
8 files changed, 96 insertions(+), 25 deletions(-)
diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
index 32fa21adea..cfc0b22432 100644
--- a/go/parquet/file/column_reader.go
+++ b/go/parquet/file/column_reader.go
@@ -18,6 +18,7 @@ package file
import (
"fmt"
+ "sync"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/internal/utils"
@@ -125,6 +126,7 @@ type columnChunkReader struct {
// the number of values we've decoded so far
numDecoded int64
mem memory.Allocator
+ bufferPool *sync.Pool
decoders map[format.Encoding]encoding.TypedDecoder
decoderTraits encoding.DecoderTraits
@@ -136,8 +138,12 @@ type columnChunkReader struct {
// NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will
// provide the pages of data for this column. The type is determined from the column passed in.
-func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator) ColumnChunkReader {
- base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder)}
+//
+// In addition to the page reader and allocator, a pointer to a shared sync.Pool is expected to provide buffers for temporary
+// usage to minimize allocations. The bufferPool should provide *memory.Buffer objects that can be resized as necessary, buffers
+// should have `ResizeNoShrink(0)` called on them before being put back into the pool.
+func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
+ base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: bufferPool}
switch descr.PhysicalType() {
case parquet.Types.FixedLenByteArray:
base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
@@ -435,15 +441,17 @@ func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, b
valsRead int64 = 0
)
- // TODO(ARROW-16790): ideally we should re-use a shared pool of buffers to avoid unnecessary memory allocation for skips
- scratch := memory.NewResizableBuffer(c.mem)
+ scratch := c.bufferPool.Get().(*memory.Buffer)
+ defer func() {
+ scratch.ResizeNoShrink(0)
+ c.bufferPool.Put(scratch)
+ }()
bufMult := 1
if c.descr.PhysicalType() == parquet.Types.Boolean {
// for bools, BytesRequired returns 1 byte per 8 bool, but casting []byte to []bool requires 1 byte per 1 bool
bufMult = 8
}
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
- defer scratch.Release()
for {
batchSize = utils.Min(batchSize, toskip)
diff --git a/go/parquet/file/column_reader_test.go b/go/parquet/file/column_reader_test.go
index eb3409942f..c0b727ed0c 100755
--- a/go/parquet/file/column_reader_test.go
+++ b/go/parquet/file/column_reader_test.go
@@ -20,6 +20,8 @@ import (
"math"
"math/rand"
"reflect"
+ "runtime"
+ "sync"
"testing"
"github.com/apache/arrow/go/v10/arrow/memory"
@@ -173,10 +175,25 @@ type PrimitiveReaderSuite struct {
nvalues int
maxDefLvl int16
maxRepLvl int16
+
+ bufferPool sync.Pool
+}
+
+func (p *PrimitiveReaderSuite) SetupTest() {
+ p.bufferPool = sync.Pool{
+ New: func() interface{} {
+ buf := memory.NewResizableBuffer(mem)
+ runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+ obj.Release()
+ })
+ return buf
+ },
+ }
}
func (p *PrimitiveReaderSuite) TearDownTest() {
p.clear()
+ p.bufferPool = sync.Pool{}
}
func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
@@ -185,7 +202,7 @@ func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
m.TestData().Set("pages", p.pages)
m.On("Err").Return((error)(nil))
p.pager = m
- p.reader = file.NewColumnReader(d, m, mem)
+ p.reader = file.NewColumnReader(d, m, mem, &p.bufferPool)
}
func (p *PrimitiveReaderSuite) checkResults(typ reflect.Type) {
diff --git a/go/parquet/file/column_writer_test.go b/go/parquet/file/column_writer_test.go
index dc141ba1d3..39eeb06f23 100755
--- a/go/parquet/file/column_writer_test.go
+++ b/go/parquet/file/column_writer_test.go
@@ -20,6 +20,8 @@ import (
"bytes"
"math"
"reflect"
+ "runtime"
+ "sync"
"testing"
"github.com/apache/arrow/go/v10/arrow/bitutil"
@@ -223,6 +225,8 @@ type PrimitiveWriterTestSuite struct {
metadata *metadata.ColumnChunkMetaDataBuilder
sink *encoding.BufferWriter
readbuffer *memory.Buffer
+
+ bufferPool sync.Pool
}
func (p *PrimitiveWriterTestSuite) SetupTest() {
@@ -230,12 +234,26 @@ func (p *PrimitiveWriterTestSuite) SetupTest() {
p.props = parquet.NewWriterProperties()
p.SetupSchema(parquet.Repetitions.Required, 1)
p.descr = p.Schema.Column(0)
+
+ p.bufferPool = sync.Pool{
+ New: func() interface{} {
+ buf := memory.NewResizableBuffer(mem)
+ runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+ obj.Release()
+ })
+ return buf
+ },
+ }
+}
+
+func (p *PrimitiveWriterTestSuite) TearDownTest() {
+ p.bufferPool = sync.Pool{}
}
func (p *PrimitiveWriterTestSuite) buildReader(nrows int64, compression compress.Compression) file.ColumnChunkReader {
p.readbuffer = p.sink.Finish()
pagereader, _ := file.NewPageReader(arrutils.NewBufferedReader(bytes.NewReader(p.readbuffer.Bytes()), p.readbuffer.Len()), nrows, compression, mem, nil)
- return file.NewColumnReader(p.descr, pagereader, mem)
+ return file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool)
}
func (p *PrimitiveWriterTestSuite) buildWriter(_ int64, columnProps parquet.ColumnProperties, version parquet.Version) file.ColumnChunkWriter {
diff --git a/go/parquet/file/file_reader.go b/go/parquet/file/file_reader.go
index a7e6525ace..d9a73faa63 100644
--- a/go/parquet/file/file_reader.go
+++ b/go/parquet/file/file_reader.go
@@ -22,6 +22,8 @@ import (
"fmt"
"io"
"os"
+ "runtime"
+ "sync"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/parquet"
@@ -47,6 +49,8 @@ type Reader struct {
metadata *metadata.FileMetaData
footerOffset int64
fileDecryptor encryption.FileDecryptor
+
+ bufferPool sync.Pool
}
type ReadOption func(*Reader)
@@ -113,6 +117,16 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er
f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
}
+ f.bufferPool = sync.Pool{
+ New: func() interface{} {
+ buf := memory.NewResizableBuffer(f.props.Allocator())
+ runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+ obj.Release()
+ })
+ return buf
+ },
+ }
+
if f.metadata == nil {
return f, f.parseMetaData()
}
@@ -120,6 +134,14 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er
return f, nil
}
+// BufferPool returns the internal buffer pool being utilized by this reader.
+// This is primarily for use by the pqarrow.FileReader or anything that builds
+// on top of the Reader and constructs their own ColumnReaders (like the
+// RecordReader)
+func (f *Reader) BufferPool() *sync.Pool {
+ return &f.bufferPool
+}
+
// Close will close the current reader, and if the underlying reader being used
// is an `io.Closer` then Close will be called on it too.
func (f *Reader) Close() error {
@@ -290,5 +312,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
r: f.r,
sourceSz: f.footerOffset,
fileDecryptor: f.fileDecryptor,
+ bufferPool: &f.bufferPool,
}
}
diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go
index 7daefac457..3e45ee915f 100755
--- a/go/parquet/file/record_reader.go
+++ b/go/parquet/file/record_reader.go
@@ -18,6 +18,7 @@ package file
import (
"fmt"
+ "sync"
"sync/atomic"
"unsafe"
@@ -127,9 +128,9 @@ type primitiveRecordReader struct {
useValues bool
}
-func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) primitiveRecordReader {
+func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, bufferPool *sync.Pool) primitiveRecordReader {
return primitiveRecordReader{
- ColumnChunkReader: NewColumnReader(descr, nil, mem),
+ ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
values: memory.NewResizableBuffer(mem),
validBits: memory.NewResizableBuffer(mem),
mem: mem,
@@ -326,12 +327,12 @@ func (b *binaryRecordReader) GetBuilderChunks() []arrow.Array {
return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
}
-func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
+func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}
- pr := createPrimitiveRecordReader(descr, mem)
+ pr := createPrimitiveRecordReader(descr, mem, bufferPool)
return &recordReader{
refCount: 1,
recordReaderImpl: &pr,
@@ -722,7 +723,7 @@ func (fr *flbaRecordReader) GetBuilderChunks() []arrow.Array {
return []arrow.Array{fr.bldr.NewArray()}
}
-func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
+func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}
@@ -731,7 +732,7 @@ func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Alloca
return &binaryRecordReader{&recordReader{
recordReaderImpl: &flbaRecordReader{
- createPrimitiveRecordReader(descr, mem),
+ createPrimitiveRecordReader(descr, mem, bufferPool),
array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
nil,
},
@@ -750,7 +751,7 @@ type byteArrayRecordReader struct {
valueBuf []parquet.ByteArray
}
-func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
+func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}
@@ -762,7 +763,7 @@ func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.A
return &binaryRecordReader{&recordReader{
recordReaderImpl: &byteArrayRecordReader{
- createPrimitiveRecordReader(descr, mem),
+ createPrimitiveRecordReader(descr, mem, bufferPool),
array.NewBinaryBuilder(mem, dt),
nil,
},
@@ -840,13 +841,13 @@ func (br *byteArrayRecordReader) GetBuilderChunks() []arrow.Array {
// TODO(mtopol): create optimized readers for dictionary types after ARROW-7286 is done
-func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator) RecordReader {
+func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
switch descr.PhysicalType() {
case parquet.Types.ByteArray:
- return newByteArrayRecordReader(descr, info, mem)
+ return newByteArrayRecordReader(descr, info, mem, bufferPool)
case parquet.Types.FixedLenByteArray:
- return newFLBARecordReader(descr, info, mem)
+ return newFLBARecordReader(descr, info, mem, bufferPool)
default:
- return newRecordReader(descr, info, mem)
+ return newRecordReader(descr, info, mem, bufferPool)
}
}
diff --git a/go/parquet/file/row_group_reader.go b/go/parquet/file/row_group_reader.go
index 71c71ec38e..b2b5bcf155 100644
--- a/go/parquet/file/row_group_reader.go
+++ b/go/parquet/file/row_group_reader.go
@@ -18,6 +18,7 @@ package file
import (
"fmt"
+ "sync"
"github.com/apache/arrow/go/v10/internal/utils"
"github.com/apache/arrow/go/v10/parquet"
@@ -38,6 +39,8 @@ type RowGroupReader struct {
rgMetadata *metadata.RowGroupMetaData
props *parquet.ReaderProperties
fileDecryptor encryption.FileDecryptor
+
+ bufferPool *sync.Pool
}
// MetaData returns the metadata of the current Row Group
@@ -65,7 +68,7 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader, error) {
if err != nil {
return nil, fmt.Errorf("parquet: unable to initialize page reader: %w", err)
}
- return NewColumnReader(descr, pageRdr, r.props.Allocator()), nil
+ return NewColumnReader(descr, pageRdr, r.props.Allocator(), r.bufferPool), nil
}
func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
diff --git a/go/parquet/pqarrow/column_readers.go b/go/parquet/pqarrow/column_readers.go
index b298e2b4c9..73577b616e 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -20,6 +20,7 @@ import (
"encoding/binary"
"fmt"
"reflect"
+ "sync"
"sync/atomic"
"time"
"unsafe"
@@ -50,13 +51,13 @@ type leafReader struct {
refCount int64
}
-func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties) (*ColumnReader, error) {
+func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) {
ret := &leafReader{
rctx: rctx,
field: field,
input: input,
descr: input.Descr(),
- recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem),
+ recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool),
props: props,
refCount: 1,
}
diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go
index 7d345d6187..f62b4571b8 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -210,7 +210,7 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups
// greatly improves performance.
// GetFieldReader causes read operations, when issued serially on large numbers of columns,
// this is super time consuming. Get field readers concurrently.
- g,gctx := errgroup.WithContext(ctx)
+ g, gctx := errgroup.WithContext(ctx)
if !fr.Props.Parallel {
g.SetLimit(1)
}
@@ -482,7 +482,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
return nil, nil
}
- out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props)
+ out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props, fr.rdr.BufferPool())
return
}
@@ -499,7 +499,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
// When reading structs with large numbers of columns, the serial load is very slow.
// This is especially true when reading Cloud Storage. Loading concurrently
// greatly improves performance.
- g,gctx := errgroup.WithContext(ctx)
+ g, gctx := errgroup.WithContext(ctx)
if !fr.Props.Parallel {
g.SetLimit(1)
}