You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/05/11 19:55:51 UTC
[arrow] branch master updated: ARROW-16473: [Go] fixing memory leak in serializedPageReader
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new bb190bbc34 ARROW-16473: [Go] fixing memory leak in serializedPageReader
bb190bbc34 is described below
commit bb190bbc345547aefe4aec3e1bc878c645c2cc50
Author: Min-Young Wu <mw...@gmail.com>
AuthorDate: Wed May 11 15:55:38 2022 -0400
ARROW-16473: [Go] fixing memory leak in serializedPageReader
`parquet/file.serializedPageReader` has a [memory.Buffer](https://github.com/apache/arrow/blob/8bd5514f52bf9cc542a389edaf697cbc2c97b752/go/parquet/file/page_reader.go#L299) attribute (presumably to reuse across page reads). But at the end of `serializedPageReader.Next` (in the non-error case), a new `memory.Buffer` is [created](https://github.com/apache/arrow/blob/8bd5514f52bf9cc542a389edaf697cbc2c97b752/go/parquet/file/page_reader.go#L615) without releasing the pre-existing `p.buf`, [...]
Existing tests updated to test and catch this (`parquet/file` now uses `CheckedAllocator).
Closes #13068 from minyoung/user/minyoung/0504-serialized-page-reader-leak
Authored-by: Min-Young Wu <mw...@gmail.com>
Signed-off-by: Matthew Topol <mt...@factset.com>
---
go/parquet/file/page_reader.go | 32 ++++++++++++++++----------------
go/parquet/pqarrow/file_reader_test.go | 7 ++++---
2 files changed, 20 insertions(+), 19 deletions(-)
diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
index 35e2bda087..dc6a1f10ae 100644
--- a/go/parquet/file/page_reader.go
+++ b/go/parquet/file/page_reader.go
@@ -296,7 +296,6 @@ type serializedPageReader struct {
codec compress.Codec
curPageHdr *format.PageHeader
- buf *memory.Buffer
pageOrd int16
maxPageHeaderSize int
@@ -326,7 +325,6 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
nrows: nrows,
mem: mem,
codec: codec,
- buf: memory.NewResizableBuffer(mem),
}
rdr.decompressBuffer.Grow(defaultPageHeaderSize)
if ctx != nil {
@@ -345,7 +343,6 @@ func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, comp
if p.err != nil {
return
}
- p.buf.ResizeNoShrink(0)
p.decompressBuffer.Reset()
if ctx != nil {
p.cryptoCtx = *ctx
@@ -488,7 +485,9 @@ func (p *serializedPageReader) Next() bool {
p.updateDecryption(p.cryptoCtx.DataDecryptor, encryption.DictPageModule, p.dataPageAad)
}
- p.buf.ResizeNoShrink(lenUncompressed)
+ buf := memory.NewResizableBuffer(p.mem)
+ defer buf.Release()
+ buf.ResizeNoShrink(lenUncompressed)
switch p.curPageHdr.GetType() {
case format.PageType_DICTIONARY_PAGE:
@@ -499,7 +498,7 @@ func (p *serializedPageReader) Next() bool {
return false
}
- data, err := p.decompress(lenCompressed, p.buf.Bytes())
+ data, err := p.decompress(lenCompressed, buf.Bytes())
if err != nil {
p.err = err
return false
@@ -529,7 +528,7 @@ func (p *serializedPageReader) Next() bool {
}
p.rowsSeen += int64(dataHeader.GetNumValues())
- data, err := p.decompress(lenCompressed, p.buf.Bytes())
+ data, err := p.decompress(lenCompressed, buf.Bytes())
if err != nil {
p.err = err
return false
@@ -574,27 +573,30 @@ func (p *serializedPageReader) Next() bool {
return false
}
- var data []byte
+ var pagebuf *memory.Buffer
if compressed {
if levelsBytelen > 0 {
- io.ReadFull(p.r, p.buf.Bytes()[:levelsBytelen])
+ io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
}
- if data, p.err = p.decompress(lenCompressed-levelsBytelen, p.buf.Bytes()[levelsBytelen:]); p.err != nil {
+ var data []byte
+ if data, p.err = p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
return false
}
+ pagebuf = memory.NewBufferBytes(data)
} else {
- io.ReadFull(p.r, p.buf.Bytes())
- data = p.buf.Bytes()
+ io.ReadFull(p.r, buf.Bytes())
+ pagebuf = buf
+ pagebuf.Retain()
}
- if len(data) != lenUncompressed {
- p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+ if pagebuf.Len() != lenUncompressed {
+ p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, pagebuf.Len())
return false
}
// make datapage v2
p.curPage = &DataPageV2{
page: page{
- buf: memory.NewBufferBytes(data),
+ buf: pagebuf,
typ: p.curPageHdr.Type,
nvals: dataHeader.GetNumValues(),
encoding: dataHeader.GetEncoding(),
@@ -611,8 +613,6 @@ func (p *serializedPageReader) Next() bool {
// we don't know this page type, we're allowed to skip non-data pages
continue
}
-
- p.buf = memory.NewResizableBuffer(p.mem)
return true
}
diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go
index d60f21a76e..0a3ed623c9 100644
--- a/go/parquet/pqarrow/file_reader_test.go
+++ b/go/parquet/pqarrow/file_reader_test.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/arrow/go/v9/arrow/array"
"github.com/apache/arrow/go/v9/arrow/decimal128"
"github.com/apache/arrow/go/v9/arrow/memory"
+ "github.com/apache/arrow/go/v9/parquet"
"github.com/apache/arrow/go/v9/parquet/file"
"github.com/apache/arrow/go/v9/parquet/pqarrow"
"github.com/stretchr/testify/assert"
@@ -63,7 +64,7 @@ func TestArrowReaderAdHocReadDecimals(t *testing.T) {
filename := filepath.Join(dataDir, tt.file+".parquet")
require.FileExists(t, filename)
- rdr, err := file.OpenParquetFile(filename, false)
+ rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
require.NoError(t, err)
defer rdr.Close()
arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
@@ -107,7 +108,7 @@ func TestRecordReaderParallel(t *testing.T) {
var buf bytes.Buffer
require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
- pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
require.NoError(t, err)
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 3, Parallel: true}, mem)
@@ -153,7 +154,7 @@ func TestRecordReaderSerial(t *testing.T) {
var buf bytes.Buffer
require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
- pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
require.NoError(t, err)
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 2}, mem)