You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/05/11 19:55:51 UTC

[arrow] branch master updated: ARROW-16473: [Go] fixing memory leak in serializedPageReader

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new bb190bbc34 ARROW-16473: [Go] fixing memory leak in serializedPageReader
bb190bbc34 is described below

commit bb190bbc345547aefe4aec3e1bc878c645c2cc50
Author: Min-Young Wu <mw...@gmail.com>
AuthorDate: Wed May 11 15:55:38 2022 -0400

    ARROW-16473: [Go] fixing memory leak in serializedPageReader
    
    `parquet/file.serializedPageReader` has a [memory.Buffer](https://github.com/apache/arrow/blob/8bd5514f52bf9cc542a389edaf697cbc2c97b752/go/parquet/file/page_reader.go#L299) attribute (presumably to reuse across page reads). But at the end of `serializedPageReader.Next` (in the non-error case), a new `memory.Buffer` is [created](https://github.com/apache/arrow/blob/8bd5514f52bf9cc542a389edaf697cbc2c97b752/go/parquet/file/page_reader.go#L615) without releasing the pre-existing `p.buf`,  [...]
    
    Existing tests updated to test and catch this (`parquet/file` now uses `CheckedAllocator).
    
    Closes #13068 from minyoung/user/minyoung/0504-serialized-page-reader-leak
    
    Authored-by: Min-Young Wu <mw...@gmail.com>
    Signed-off-by: Matthew Topol <mt...@factset.com>
---
 go/parquet/file/page_reader.go         | 32 ++++++++++++++++----------------
 go/parquet/pqarrow/file_reader_test.go |  7 ++++---
 2 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
index 35e2bda087..dc6a1f10ae 100644
--- a/go/parquet/file/page_reader.go
+++ b/go/parquet/file/page_reader.go
@@ -296,7 +296,6 @@ type serializedPageReader struct {
 	codec    compress.Codec
 
 	curPageHdr        *format.PageHeader
-	buf               *memory.Buffer
 	pageOrd           int16
 	maxPageHeaderSize int
 
@@ -326,7 +325,6 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
 		nrows:             nrows,
 		mem:               mem,
 		codec:             codec,
-		buf:               memory.NewResizableBuffer(mem),
 	}
 	rdr.decompressBuffer.Grow(defaultPageHeaderSize)
 	if ctx != nil {
@@ -345,7 +343,6 @@ func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, comp
 	if p.err != nil {
 		return
 	}
-	p.buf.ResizeNoShrink(0)
 	p.decompressBuffer.Reset()
 	if ctx != nil {
 		p.cryptoCtx = *ctx
@@ -488,7 +485,9 @@ func (p *serializedPageReader) Next() bool {
 			p.updateDecryption(p.cryptoCtx.DataDecryptor, encryption.DictPageModule, p.dataPageAad)
 		}
 
-		p.buf.ResizeNoShrink(lenUncompressed)
+		buf := memory.NewResizableBuffer(p.mem)
+		defer buf.Release()
+		buf.ResizeNoShrink(lenUncompressed)
 
 		switch p.curPageHdr.GetType() {
 		case format.PageType_DICTIONARY_PAGE:
@@ -499,7 +498,7 @@ func (p *serializedPageReader) Next() bool {
 				return false
 			}
 
-			data, err := p.decompress(lenCompressed, p.buf.Bytes())
+			data, err := p.decompress(lenCompressed, buf.Bytes())
 			if err != nil {
 				p.err = err
 				return false
@@ -529,7 +528,7 @@ func (p *serializedPageReader) Next() bool {
 			}
 
 			p.rowsSeen += int64(dataHeader.GetNumValues())
-			data, err := p.decompress(lenCompressed, p.buf.Bytes())
+			data, err := p.decompress(lenCompressed, buf.Bytes())
 			if err != nil {
 				p.err = err
 				return false
@@ -574,27 +573,30 @@ func (p *serializedPageReader) Next() bool {
 				return false
 			}
 
-			var data []byte
+			var pagebuf *memory.Buffer
 			if compressed {
 				if levelsBytelen > 0 {
-					io.ReadFull(p.r, p.buf.Bytes()[:levelsBytelen])
+					io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
 				}
-				if data, p.err = p.decompress(lenCompressed-levelsBytelen, p.buf.Bytes()[levelsBytelen:]); p.err != nil {
+				var data []byte
+				if data, p.err = p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
 					return false
 				}
+				pagebuf = memory.NewBufferBytes(data)
 			} else {
-				io.ReadFull(p.r, p.buf.Bytes())
-				data = p.buf.Bytes()
+				io.ReadFull(p.r, buf.Bytes())
+				pagebuf = buf
+				pagebuf.Retain()
 			}
-			if len(data) != lenUncompressed {
-				p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+			if pagebuf.Len() != lenUncompressed {
+				p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, pagebuf.Len())
 				return false
 			}
 
 			// make datapage v2
 			p.curPage = &DataPageV2{
 				page: page{
-					buf:      memory.NewBufferBytes(data),
+					buf:      pagebuf,
 					typ:      p.curPageHdr.Type,
 					nvals:    dataHeader.GetNumValues(),
 					encoding: dataHeader.GetEncoding(),
@@ -611,8 +613,6 @@ func (p *serializedPageReader) Next() bool {
 			// we don't know this page type, we're allowed to skip non-data pages
 			continue
 		}
-
-		p.buf = memory.NewResizableBuffer(p.mem)
 		return true
 	}
 
diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go
index d60f21a76e..0a3ed623c9 100644
--- a/go/parquet/pqarrow/file_reader_test.go
+++ b/go/parquet/pqarrow/file_reader_test.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/arrow/go/v9/arrow/array"
 	"github.com/apache/arrow/go/v9/arrow/decimal128"
 	"github.com/apache/arrow/go/v9/arrow/memory"
+	"github.com/apache/arrow/go/v9/parquet"
 	"github.com/apache/arrow/go/v9/parquet/file"
 	"github.com/apache/arrow/go/v9/parquet/pqarrow"
 	"github.com/stretchr/testify/assert"
@@ -63,7 +64,7 @@ func TestArrowReaderAdHocReadDecimals(t *testing.T) {
 			filename := filepath.Join(dataDir, tt.file+".parquet")
 			require.FileExists(t, filename)
 
-			rdr, err := file.OpenParquetFile(filename, false)
+			rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
 			require.NoError(t, err)
 			defer rdr.Close()
 			arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
@@ -107,7 +108,7 @@ func TestRecordReaderParallel(t *testing.T) {
 	var buf bytes.Buffer
 	require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
 
-	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
 	require.NoError(t, err)
 
 	reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 3, Parallel: true}, mem)
@@ -153,7 +154,7 @@ func TestRecordReaderSerial(t *testing.T) {
 	var buf bytes.Buffer
 	require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
 
-	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+	pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(parquet.NewReaderProperties(mem)))
 	require.NoError(t, err)
 
 	reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 2}, mem)