You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/08/16 18:25:05 UTC

[arrow] branch master updated: ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887)

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ee3a8d8a92 ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887)
ee3a8d8a92 is described below

commit ee3a8d8a92e5d01c3c3846fb400c5c523c625642
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Tue Aug 16 14:24:59 2022 -0400

    ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887)
    
    Authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/parquet/file/column_reader.go      | 18 +++++++++++++-----
 go/parquet/file/column_reader_test.go | 19 ++++++++++++++++++-
 go/parquet/file/column_writer_test.go | 20 +++++++++++++++++++-
 go/parquet/file/file_reader.go        | 23 +++++++++++++++++++++++
 go/parquet/file/record_reader.go      | 25 +++++++++++++------------
 go/parquet/file/row_group_reader.go   |  5 ++++-
 go/parquet/pqarrow/column_readers.go  |  5 +++--
 go/parquet/pqarrow/file_reader.go     |  6 +++---
 8 files changed, 96 insertions(+), 25 deletions(-)

diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
index 32fa21adea..cfc0b22432 100644
--- a/go/parquet/file/column_reader.go
+++ b/go/parquet/file/column_reader.go
@@ -18,6 +18,7 @@ package file
 
 import (
 	"fmt"
+	"sync"
 
 	"github.com/apache/arrow/go/v10/arrow/memory"
 	"github.com/apache/arrow/go/v10/internal/utils"
@@ -125,6 +126,7 @@ type columnChunkReader struct {
 	// the number of values we've decoded so far
 	numDecoded int64
 	mem        memory.Allocator
+	bufferPool *sync.Pool
 
 	decoders      map[format.Encoding]encoding.TypedDecoder
 	decoderTraits encoding.DecoderTraits
@@ -136,8 +138,12 @@ type columnChunkReader struct {
 
 // NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will
 // provide the pages of data for this column. The type is determined from the column passed in.
-func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator) ColumnChunkReader {
-	base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder)}
+//
+// In addition to the page reader and allocator, a pointer to a shared sync.Pool is expected to provide buffers for temporary
+// usage to minimize allocations. The bufferPool should provide *memory.Buffer objects that can be resized as necessary, buffers
+// should have `ResizeNoShrink(0)` called on them before being put back into the pool.
+func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
+	base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: bufferPool}
 	switch descr.PhysicalType() {
 	case parquet.Types.FixedLenByteArray:
 		base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
@@ -435,15 +441,17 @@ func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, b
 				valsRead  int64 = 0
 			)
 
-			// TODO(ARROW-16790): ideally we should re-use a shared pool of buffers to avoid unnecessary memory allocation for skips
-			scratch := memory.NewResizableBuffer(c.mem)
+			scratch := c.bufferPool.Get().(*memory.Buffer)
+			defer func() {
+				scratch.ResizeNoShrink(0)
+				c.bufferPool.Put(scratch)
+			}()
 			bufMult := 1
 			if c.descr.PhysicalType() == parquet.Types.Boolean {
 				// for bools, BytesRequired returns 1 byte per 8 bool, but casting []byte to []bool requires 1 byte per 1 bool
 				bufMult = 8
 			}
 			scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
-			defer scratch.Release()
 
 			for {
 				batchSize = utils.Min(batchSize, toskip)
diff --git a/go/parquet/file/column_reader_test.go b/go/parquet/file/column_reader_test.go
index eb3409942f..c0b727ed0c 100755
--- a/go/parquet/file/column_reader_test.go
+++ b/go/parquet/file/column_reader_test.go
@@ -20,6 +20,8 @@ import (
 	"math"
 	"math/rand"
 	"reflect"
+	"runtime"
+	"sync"
 	"testing"
 
 	"github.com/apache/arrow/go/v10/arrow/memory"
@@ -173,10 +175,25 @@ type PrimitiveReaderSuite struct {
 	nvalues         int
 	maxDefLvl       int16
 	maxRepLvl       int16
+
+	bufferPool sync.Pool
+}
+
+func (p *PrimitiveReaderSuite) SetupTest() {
+	p.bufferPool = sync.Pool{
+		New: func() interface{} {
+			buf := memory.NewResizableBuffer(mem)
+			runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+				obj.Release()
+			})
+			return buf
+		},
+	}
 }
 
 func (p *PrimitiveReaderSuite) TearDownTest() {
 	p.clear()
+	p.bufferPool = sync.Pool{}
 }
 
 func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
@@ -185,7 +202,7 @@ func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
 	m.TestData().Set("pages", p.pages)
 	m.On("Err").Return((error)(nil))
 	p.pager = m
-	p.reader = file.NewColumnReader(d, m, mem)
+	p.reader = file.NewColumnReader(d, m, mem, &p.bufferPool)
 }
 
 func (p *PrimitiveReaderSuite) checkResults(typ reflect.Type) {
diff --git a/go/parquet/file/column_writer_test.go b/go/parquet/file/column_writer_test.go
index dc141ba1d3..39eeb06f23 100755
--- a/go/parquet/file/column_writer_test.go
+++ b/go/parquet/file/column_writer_test.go
@@ -20,6 +20,8 @@ import (
 	"bytes"
 	"math"
 	"reflect"
+	"runtime"
+	"sync"
 	"testing"
 
 	"github.com/apache/arrow/go/v10/arrow/bitutil"
@@ -223,6 +225,8 @@ type PrimitiveWriterTestSuite struct {
 	metadata   *metadata.ColumnChunkMetaDataBuilder
 	sink       *encoding.BufferWriter
 	readbuffer *memory.Buffer
+
+	bufferPool sync.Pool
 }
 
 func (p *PrimitiveWriterTestSuite) SetupTest() {
@@ -230,12 +234,26 @@ func (p *PrimitiveWriterTestSuite) SetupTest() {
 	p.props = parquet.NewWriterProperties()
 	p.SetupSchema(parquet.Repetitions.Required, 1)
 	p.descr = p.Schema.Column(0)
+
+	p.bufferPool = sync.Pool{
+		New: func() interface{} {
+			buf := memory.NewResizableBuffer(mem)
+			runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+				obj.Release()
+			})
+			return buf
+		},
+	}
+}
+
+func (p *PrimitiveWriterTestSuite) TearDownTest() {
+	p.bufferPool = sync.Pool{}
 }
 
 func (p *PrimitiveWriterTestSuite) buildReader(nrows int64, compression compress.Compression) file.ColumnChunkReader {
 	p.readbuffer = p.sink.Finish()
 	pagereader, _ := file.NewPageReader(arrutils.NewBufferedReader(bytes.NewReader(p.readbuffer.Bytes()), p.readbuffer.Len()), nrows, compression, mem, nil)
-	return file.NewColumnReader(p.descr, pagereader, mem)
+	return file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool)
 }
 
 func (p *PrimitiveWriterTestSuite) buildWriter(_ int64, columnProps parquet.ColumnProperties, version parquet.Version) file.ColumnChunkWriter {
diff --git a/go/parquet/file/file_reader.go b/go/parquet/file/file_reader.go
index a7e6525ace..d9a73faa63 100644
--- a/go/parquet/file/file_reader.go
+++ b/go/parquet/file/file_reader.go
@@ -22,6 +22,8 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"runtime"
+	"sync"
 
 	"github.com/apache/arrow/go/v10/arrow/memory"
 	"github.com/apache/arrow/go/v10/parquet"
@@ -47,6 +49,8 @@ type Reader struct {
 	metadata      *metadata.FileMetaData
 	footerOffset  int64
 	fileDecryptor encryption.FileDecryptor
+
+	bufferPool sync.Pool
 }
 
 type ReadOption func(*Reader)
@@ -113,6 +117,16 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er
 		f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
 	}
 
+	f.bufferPool = sync.Pool{
+		New: func() interface{} {
+			buf := memory.NewResizableBuffer(f.props.Allocator())
+			runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+				obj.Release()
+			})
+			return buf
+		},
+	}
+
 	if f.metadata == nil {
 		return f, f.parseMetaData()
 	}
@@ -120,6 +134,14 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er
 	return f, nil
 }
 
+// BufferPool returns the internal buffer pool being utilized by this reader.
+// This is primarily for use by the pqarrow.FileReader or anything that builds
+// on top of the Reader and constructs their own ColumnReaders (like the
+// RecordReader)
+func (f *Reader) BufferPool() *sync.Pool {
+	return &f.bufferPool
+}
+
 // Close will close the current reader, and if the underlying reader being used
 // is an `io.Closer` then Close will be called on it too.
 func (f *Reader) Close() error {
@@ -290,5 +312,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
 		r:             f.r,
 		sourceSz:      f.footerOffset,
 		fileDecryptor: f.fileDecryptor,
+		bufferPool:    &f.bufferPool,
 	}
 }
diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go
index 7daefac457..3e45ee915f 100755
--- a/go/parquet/file/record_reader.go
+++ b/go/parquet/file/record_reader.go
@@ -18,6 +18,7 @@ package file
 
 import (
 	"fmt"
+	"sync"
 	"sync/atomic"
 	"unsafe"
 
@@ -127,9 +128,9 @@ type primitiveRecordReader struct {
 	useValues bool
 }
 
-func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) primitiveRecordReader {
+func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, bufferPool *sync.Pool) primitiveRecordReader {
 	return primitiveRecordReader{
-		ColumnChunkReader: NewColumnReader(descr, nil, mem),
+		ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
 		values:            memory.NewResizableBuffer(mem),
 		validBits:         memory.NewResizableBuffer(mem),
 		mem:               mem,
@@ -326,12 +327,12 @@ func (b *binaryRecordReader) GetBuilderChunks() []arrow.Array {
 	return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
 }
 
-func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
+func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
 	if mem == nil {
 		mem = memory.DefaultAllocator
 	}
 
-	pr := createPrimitiveRecordReader(descr, mem)
+	pr := createPrimitiveRecordReader(descr, mem, bufferPool)
 	return &recordReader{
 		refCount:         1,
 		recordReaderImpl: &pr,
@@ -722,7 +723,7 @@ func (fr *flbaRecordReader) GetBuilderChunks() []arrow.Array {
 	return []arrow.Array{fr.bldr.NewArray()}
 }
 
-func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
+func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
 	if mem == nil {
 		mem = memory.DefaultAllocator
 	}
@@ -731,7 +732,7 @@ func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Alloca
 
 	return &binaryRecordReader{&recordReader{
 		recordReaderImpl: &flbaRecordReader{
-			createPrimitiveRecordReader(descr, mem),
+			createPrimitiveRecordReader(descr, mem, bufferPool),
 			array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
 			nil,
 		},
@@ -750,7 +751,7 @@ type byteArrayRecordReader struct {
 	valueBuf []parquet.ByteArray
 }
 
-func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
+func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
 	if mem == nil {
 		mem = memory.DefaultAllocator
 	}
@@ -762,7 +763,7 @@ func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.A
 
 	return &binaryRecordReader{&recordReader{
 		recordReaderImpl: &byteArrayRecordReader{
-			createPrimitiveRecordReader(descr, mem),
+			createPrimitiveRecordReader(descr, mem, bufferPool),
 			array.NewBinaryBuilder(mem, dt),
 			nil,
 		},
@@ -840,13 +841,13 @@ func (br *byteArrayRecordReader) GetBuilderChunks() []arrow.Array {
 
 // TODO(mtopol): create optimized readers for dictionary types after ARROW-7286 is done
 
-func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator) RecordReader {
+func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
 	switch descr.PhysicalType() {
 	case parquet.Types.ByteArray:
-		return newByteArrayRecordReader(descr, info, mem)
+		return newByteArrayRecordReader(descr, info, mem, bufferPool)
 	case parquet.Types.FixedLenByteArray:
-		return newFLBARecordReader(descr, info, mem)
+		return newFLBARecordReader(descr, info, mem, bufferPool)
 	default:
-		return newRecordReader(descr, info, mem)
+		return newRecordReader(descr, info, mem, bufferPool)
 	}
 }
diff --git a/go/parquet/file/row_group_reader.go b/go/parquet/file/row_group_reader.go
index 71c71ec38e..b2b5bcf155 100644
--- a/go/parquet/file/row_group_reader.go
+++ b/go/parquet/file/row_group_reader.go
@@ -18,6 +18,7 @@ package file
 
 import (
 	"fmt"
+	"sync"
 
 	"github.com/apache/arrow/go/v10/internal/utils"
 	"github.com/apache/arrow/go/v10/parquet"
@@ -38,6 +39,8 @@ type RowGroupReader struct {
 	rgMetadata    *metadata.RowGroupMetaData
 	props         *parquet.ReaderProperties
 	fileDecryptor encryption.FileDecryptor
+
+	bufferPool *sync.Pool
 }
 
 // MetaData returns the metadata of the current Row Group
@@ -65,7 +68,7 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader, error) {
 	if err != nil {
 		return nil, fmt.Errorf("parquet: unable to initialize page reader: %w", err)
 	}
-	return NewColumnReader(descr, pageRdr, r.props.Allocator()), nil
+	return NewColumnReader(descr, pageRdr, r.props.Allocator(), r.bufferPool), nil
 }
 
 func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
diff --git a/go/parquet/pqarrow/column_readers.go b/go/parquet/pqarrow/column_readers.go
index b298e2b4c9..73577b616e 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -20,6 +20,7 @@ import (
 	"encoding/binary"
 	"fmt"
 	"reflect"
+	"sync"
 	"sync/atomic"
 	"time"
 	"unsafe"
@@ -50,13 +51,13 @@ type leafReader struct {
 	refCount int64
 }
 
-func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties) (*ColumnReader, error) {
+func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) {
 	ret := &leafReader{
 		rctx:      rctx,
 		field:     field,
 		input:     input,
 		descr:     input.Descr(),
-		recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem),
+		recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool),
 		props:     props,
 		refCount:  1,
 	}
diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go
index 7d345d6187..f62b4571b8 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -210,7 +210,7 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups
 	// greatly improves performance.
 	// GetFieldReader causes read operations, when issued serially on large numbers of columns,
 	// this is super time consuming. Get field readers concurrently.
-	g,gctx := errgroup.WithContext(ctx)
+	g, gctx := errgroup.WithContext(ctx)
 	if !fr.Props.Parallel {
 		g.SetLimit(1)
 	}
@@ -482,7 +482,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
 			return nil, nil
 		}
 
-		out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props)
+		out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props, fr.rdr.BufferPool())
 		return
 	}
 
@@ -499,7 +499,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
 		// When reading structs with large numbers of columns, the serial load is very slow.
 		// This is especially true when reading Cloud Storage. Loading concurrently
 		// greatly improves performance.
-		g,gctx := errgroup.WithContext(ctx)
+		g, gctx := errgroup.WithContext(ctx)
 		if !fr.Props.Parallel {
 			g.SetLimit(1)
 		}