You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/04/21 22:34:10 UTC

[arrow] branch master updated: ARROW-16187: [Go][Parquet] Properly utilize BufferedStream and buffer size when reading

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bd5514f52 ARROW-16187: [Go][Parquet] Properly utilize BufferedStream and buffer size when reading
8bd5514f52 is described below

commit 8bd5514f52bf9cc542a389edaf697cbc2c97b752
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Thu Apr 21 18:35:58 2022 -0400

    ARROW-16187: [Go][Parquet] Properly utilize BufferedStream and buffer size when reading
    
    Currently the BufferSize in the `ReaderProperties` isn't utilized properly when enabling BufferedStreams. This fixes that issue so that enabling `BufferedStream` reading via the properties will correctly utilize the given buffer size when reading. The default buffersize is currently 16K, so reads that are larger than that will ignore the buffering and just pull directly from the underlying reader when BufferedStream is enabled, pulling the entire page or otherwise from the reader if B [...]
    
    The buffer size can be set larger so that controlled reads can improve performance on high-latency readers without having to use the memory to read the entire column/page/row group into memory.
    
    Closes #12876 from zeroshade/arrow-16187-parquet-buffered-stream
    
    Lead-authored-by: Matthew Topol <mt...@factset.com>
    Co-authored-by: Matthew Topol <zo...@gmail.com>
    Signed-off-by: Matthew Topol <mt...@factset.com>
---
 go/internal/utils/buf_reader.go              | 195 +++++++++++++++++++++++++++
 go/parquet/file/column_writer_test.go        |   3 +-
 go/parquet/file/file_reader_test.go          |   3 +-
 go/parquet/file/page_reader.go               |  29 ++--
 go/parquet/internal/encryption/aes.go        |  46 +++++++
 go/parquet/internal/encryption/decryptor.go  |   7 +
 go/parquet/internal/testutils/pagebuilder.go |   2 +-
 go/parquet/reader_properties.go              |  18 ++-
 8 files changed, 276 insertions(+), 27 deletions(-)

diff --git a/go/internal/utils/buf_reader.go b/go/internal/utils/buf_reader.go
new file mode 100644
index 0000000000..cc53ae6570
--- /dev/null
+++ b/go/internal/utils/buf_reader.go
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+import (
+	"errors"
+	"io"
+)
+
+// bufferedReader is similar to bufio.Reader except
+// it will expand the buffer if necessary when asked to Peek
+// more bytes than are in the buffer
+type bufferedReader struct {
+	bufferSz int
+	buf      []byte
+	r, w     int
+	rd       io.Reader
+	err      error
+}
+
+func NewBufferedReader(rd io.Reader, sz int) *bufferedReader {
+	b, ok := rd.(*bufferedReader)
+	if ok && len(b.buf) >= sz {
+		return b
+	}
+
+	r := &bufferedReader{
+		rd: rd,
+	}
+	r.resizeBuffer(sz)
+	return r
+}
+
+func (b *bufferedReader) resetBuffer() {
+	if b.buf == nil {
+		b.buf = make([]byte, b.bufferSz)
+	} else if b.bufferSz > cap(b.buf) {
+		buf := b.buf
+		b.buf = make([]byte, b.bufferSz)
+		copy(b.buf, buf)
+	} else {
+		b.buf = b.buf[:b.bufferSz]
+	}
+}
+
+func (b *bufferedReader) resizeBuffer(newSize int) {
+	b.bufferSz = newSize
+	b.resetBuffer()
+}
+
+func (b *bufferedReader) fill() {
+	// slide existing data to the beginning
+	if b.r > 0 {
+		copy(b.buf, b.buf[b.r:b.w])
+		b.w -= b.r
+		b.r = 0
+	}
+
+	if b.w >= len(b.buf) {
+		panic("parquet/bufio: tried to fill full buffer")
+	}
+
+	n, err := io.ReadAtLeast(b.rd, b.buf[b.w:], 1)
+	if n < 0 {
+		panic("negative read")
+	}
+
+	b.w += n
+	b.err = err
+}
+
+func (b *bufferedReader) readErr() error {
+	err := b.err
+	b.err = nil
+	return err
+}
+
+func (b *bufferedReader) Buffered() int { return b.w - b.r }
+
+func (b *bufferedReader) SetBufferSize(newSize int) error {
+	if newSize <= 0 {
+		return errors.New("buffer size should be positive")
+	}
+
+	if b.w >= newSize {
+		return errors.New("cannot shrink read buffer if buffered data remains")
+	}
+
+	b.resizeBuffer(newSize)
+	return nil
+}
+
+func (b *bufferedReader) Peek(n int) ([]byte, error) {
+	if n < 0 {
+		return nil, errors.New("parquet/bufio: negative count")
+	}
+
+	if n > len(b.buf) {
+		if err := b.SetBufferSize(n); err != nil {
+			return nil, err
+		}
+	}
+
+	for b.w-b.r < n && b.w-b.r < len(b.buf) && b.err == nil {
+		b.fill() // b.w-b.r < len(b.buf) => buffer is not full
+	}
+
+	return b.buf[b.r : b.r+n], b.readErr()
+}
+
+func (b *bufferedReader) Discard(n int) (discarded int, err error) {
+	if n < 0 {
+		return 0, errors.New("negative count")
+	}
+
+	if n == 0 {
+		return
+	}
+
+	remain := n
+	for {
+		skip := b.Buffered()
+		if skip == 0 {
+			b.fill()
+			skip = b.Buffered()
+		}
+		if skip > remain {
+			skip = remain
+		}
+		b.r += skip
+		remain -= skip
+		if remain == 0 {
+			return n, nil
+		}
+		if b.err != nil {
+			return n - remain, b.readErr()
+		}
+	}
+}
+
+func (b *bufferedReader) Read(p []byte) (n int, err error) {
+	n = len(p)
+	if n == 0 {
+		if b.Buffered() > 0 {
+			return 0, nil
+		}
+		return 0, b.readErr()
+	}
+
+	if b.r == b.w {
+		if b.err != nil {
+			return 0, b.readErr()
+		}
+		if len(p) >= len(b.buf) {
+			// large read, empty buffer
+			// read directly into p to avoid extra copy
+			n, b.err = b.rd.Read(p)
+			if n < 0 {
+				panic("negative read")
+			}
+			return n, b.readErr()
+		}
+
+		// one read
+		// don't use b.fill
+		b.r, b.w = 0, 0
+		n, b.err = b.rd.Read(b.buf)
+		if n < 0 {
+			panic("negative read")
+		}
+		if n == 0 {
+			return 0, b.readErr()
+		}
+		b.w += n
+	}
+
+	// copy as much as we can
+	n = copy(p, b.buf[b.r:b.w])
+	b.r += n
+	return n, nil
+}
diff --git a/go/parquet/file/column_writer_test.go b/go/parquet/file/column_writer_test.go
index aabdcb327d..c510bf3f54 100755
--- a/go/parquet/file/column_writer_test.go
+++ b/go/parquet/file/column_writer_test.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/apache/arrow/go/v8/arrow/bitutil"
 	"github.com/apache/arrow/go/v8/arrow/memory"
+	arrutils "github.com/apache/arrow/go/v8/internal/utils"
 	"github.com/apache/arrow/go/v8/parquet"
 	"github.com/apache/arrow/go/v8/parquet/compress"
 	"github.com/apache/arrow/go/v8/parquet/file"
@@ -233,7 +234,7 @@ func (p *PrimitiveWriterTestSuite) SetupTest() {
 
 func (p *PrimitiveWriterTestSuite) buildReader(nrows int64, compression compress.Compression) file.ColumnChunkReader {
 	p.readbuffer = p.sink.Finish()
-	pagereader, _ := file.NewPageReader(bytes.NewReader(p.readbuffer.Bytes()), nrows, compression, mem, nil)
+	pagereader, _ := file.NewPageReader(arrutils.NewBufferedReader(bytes.NewReader(p.readbuffer.Bytes()), p.readbuffer.Len()), nrows, compression, mem, nil)
 	return file.NewColumnReader(p.descr, pagereader, mem)
 }
 
diff --git a/go/parquet/file/file_reader_test.go b/go/parquet/file/file_reader_test.go
index db987e406d..76532c4574 100644
--- a/go/parquet/file/file_reader_test.go
+++ b/go/parquet/file/file_reader_test.go
@@ -24,6 +24,7 @@ import (
 	"testing"
 
 	"github.com/apache/arrow/go/v8/arrow/memory"
+	"github.com/apache/arrow/go/v8/internal/utils"
 	"github.com/apache/arrow/go/v8/parquet/compress"
 	"github.com/apache/arrow/go/v8/parquet/file"
 	"github.com/apache/arrow/go/v8/parquet/internal/encoding"
@@ -101,7 +102,7 @@ func (p *PageSerdeSuite) SetupTest() {
 func (p *PageSerdeSuite) InitSerializedPageReader(nrows int64, codec compress.Compression) {
 	p.EndStream()
 
-	p.pageReader, _ = file.NewPageReader(bytes.NewReader(p.buffer.Bytes()), nrows, codec, memory.DefaultAllocator, nil)
+	p.pageReader, _ = file.NewPageReader(utils.NewBufferedReader(bytes.NewReader(p.buffer.Bytes()), p.buffer.Len()), nrows, codec, memory.DefaultAllocator, nil)
 }
 
 func (p *PageSerdeSuite) WriteDataPageHeader(maxSerialized int, uncompressed, compressed int32) {
diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
index 05ad050bc2..0458d06df3 100644
--- a/go/parquet/file/page_reader.go
+++ b/go/parquet/file/page_reader.go
@@ -46,7 +46,7 @@ type PageReader interface {
 	// nil if there was no error and you just hit the end of the page
 	Err() error
 	// Reset allows reusing a page reader
-	Reset(r io.ReadSeeker, nrows int64, compressType compress.Compression, ctx *CryptoContext)
+	Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext)
 }
 
 // Page is an interface for handling DataPages or Dictionary Pages
@@ -289,7 +289,7 @@ func (d *DictionaryPage) Release() {
 func (d *DictionaryPage) IsSorted() bool { return d.sorted }
 
 type serializedPageReader struct {
-	r        io.ReadSeeker
+	r        parquet.BufferedReader
 	nrows    int64
 	rowsSeen int64
 	mem      memory.Allocator
@@ -310,7 +310,7 @@ type serializedPageReader struct {
 }
 
 // NewPageReader returns a page reader for the data which can be read from the provided reader and compression.
-func NewPageReader(r io.ReadSeeker, nrows int64, compressType compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader, error) {
+func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader, error) {
 	if mem == nil {
 		mem = memory.NewGoAllocator()
 	}
@@ -336,10 +336,10 @@ func NewPageReader(r io.ReadSeeker, nrows int64, compressType compress.Compressi
 	return rdr, nil
 }
 
-func (p *serializedPageReader) Reset(r io.ReadSeeker, nrows int64, compressType compress.Compression, ctx *CryptoContext) {
-	p.rowsSeen, p.pageOrd = 0, 0
+func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext) {
+	p.rowsSeen, p.pageOrd, p.nrows = 0, 0, nrows
 	p.curPageHdr, p.curPage, p.err = nil, nil, nil
-	p.r, p.nrows = r, nrows
+	p.r = r
 
 	p.codec, p.err = compress.GetCodec(compressType)
 	if p.err != nil {
@@ -390,7 +390,6 @@ func (p *serializedPageReader) Page() Page {
 }
 
 func (p *serializedPageReader) decompress(lenCompressed int, buf []byte) ([]byte, error) {
-	p.decompressBuffer.Reset()
 	p.decompressBuffer.Grow(lenCompressed)
 	if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed)); err != nil {
 		return nil, err
@@ -444,28 +443,19 @@ func (p *serializedPageReader) Next() bool {
 	p.err = nil
 
 	for p.rowsSeen < p.nrows {
-		// headerSize := 0
 		allowedPgSz := defaultPageHeaderSize
-
-		start, _ := p.r.Seek(0, io.SeekCurrent)
 		p.decompressBuffer.Reset()
-		// Page headers can be very large because of page statistics
-		// We try to deserialize a larger buffer progressively
-		// until a maximum allowed header limit
 		for {
-			n, err := io.CopyN(&p.decompressBuffer, p.r, int64(allowedPgSz))
-			// view, err := p.r.Peek(allowedPgSz)
+			view, err := p.r.Peek(allowedPgSz)
 			if err != nil && err != io.EOF {
 				p.err = err
 				return false
 			}
 
-			if n == 0 {
+			if len(view) == 0 {
 				return false
 			}
 
-			view := p.decompressBuffer.Bytes()
-
 			extra := 0
 			if p.cryptoCtx.MetaDecryptor != nil {
 				p.updateDecryption(p.cryptoCtx.MetaDecryptor, encryption.DictPageHeaderModule, p.dataPageHeaderAad)
@@ -483,7 +473,7 @@ func (p *serializedPageReader) Next() bool {
 				continue
 			}
 
-			p.r.Seek(start+int64(len(view)-int(remaining)+extra), io.SeekStart)
+			p.r.Discard(len(view) - int(remaining) + extra)
 			break
 		}
 
@@ -519,7 +509,6 @@ func (p *serializedPageReader) Next() bool {
 				return false
 			}
 
-			// p.buf.Resize(lenUncompressed)
 			// make dictionary page
 			p.curPage = &DictionaryPage{
 				page: page{
diff --git a/go/parquet/internal/encryption/aes.go b/go/parquet/internal/encryption/aes.go
index 34d6d10c84..4ffb0188c4 100644
--- a/go/parquet/internal/encryption/aes.go
+++ b/go/parquet/internal/encryption/aes.go
@@ -191,6 +191,52 @@ func newAesDecryptor(alg parquet.Cipher, metadata bool) *aesDecryptor {
 // the length of the plaintext after decryption.
 func (a *aesDecryptor) CiphertextSizeDelta() int { return a.ciphertextSizeDelta }
 
+// DecryptFrom
+func (a *aesDecryptor) DecryptFrom(r io.Reader, key, aad []byte) []byte {
+	block, err := aes.NewCipher(key)
+	if err != nil {
+		panic(err)
+	}
+
+	var writtenCiphertextLen uint32
+	if err := binary.Read(r, binary.LittleEndian, &writtenCiphertextLen); err != nil {
+		panic(err)
+	}
+
+	cipherText := make([]byte, writtenCiphertextLen)
+	if n, err := io.ReadFull(r, cipherText); n != int(writtenCiphertextLen) || err != nil {
+		panic(err)
+	}
+
+	nonce := cipherText[:NonceLength]
+	cipherText = cipherText[NonceLength:]
+	if a.mode == gcmMode {
+		aead, err := cipher.NewGCM(block)
+		if err != nil {
+			panic(err)
+		}
+
+		plain, err := aead.Open(cipherText[:0], nonce, cipherText, aad)
+		if err != nil {
+			panic(err)
+		}
+		return plain
+	}
+
+	// Parquet CTR IVs are comprised of a 12-byte nonce and a 4-byte initial
+	// counter field.
+	// The first 31 bits of the initial counter field are set to 0, the last bit
+	// is set to 1.
+	iv := make([]byte, ctrIVLen)
+	copy(iv, nonce)
+	iv[ctrIVLen-1] = 1
+
+	stream := cipher.NewCTR(block, iv)
+	// dst := make([]byte, len(cipherText))
+	stream.XORKeyStream(cipherText, cipherText)
+	return cipherText
+}
+
 // Decrypt returns the plaintext version of the given ciphertext when decrypted
 // with the provided key and AAD security bytes.
 func (a *aesDecryptor) Decrypt(cipherText, key, aad []byte) []byte {
diff --git a/go/parquet/internal/encryption/decryptor.go b/go/parquet/internal/encryption/decryptor.go
index cbc167d187..e0832bb19a 100644
--- a/go/parquet/internal/encryption/decryptor.go
+++ b/go/parquet/internal/encryption/decryptor.go
@@ -17,6 +17,8 @@
 package encryption
 
 import (
+	"io"
+
 	"github.com/apache/arrow/go/v8/arrow/memory"
 	"github.com/apache/arrow/go/v8/parquet"
 )
@@ -240,6 +242,8 @@ type Decryptor interface {
 	CiphertextSizeDelta() int
 	// Decrypt just returns the decrypted plaintext from the src ciphertext
 	Decrypt(src []byte) []byte
+	// Decrypt just returns the decrypted plaintext from the src ciphertext
+	DecryptFrom(r io.Reader) []byte
 	// set the AAD bytes of the decryptor to the provided string
 	UpdateAad(string)
 }
@@ -259,3 +263,6 @@ func (d *decryptor) CiphertextSizeDelta() int    { return d.decryptor.Ciphertext
 func (d *decryptor) Decrypt(src []byte) []byte {
 	return d.decryptor.Decrypt(src, d.key, d.aad)
 }
+func (d *decryptor) DecryptFrom(r io.Reader) []byte {
+	return d.decryptor.DecryptFrom(r, d.key, d.aad)
+}
diff --git a/go/parquet/internal/testutils/pagebuilder.go b/go/parquet/internal/testutils/pagebuilder.go
index ad0c211a07..5ab060a47c 100644
--- a/go/parquet/internal/testutils/pagebuilder.go
+++ b/go/parquet/internal/testutils/pagebuilder.go
@@ -220,7 +220,7 @@ func (m *MockPageReader) Err() error {
 	return m.Called().Error(0)
 }
 
-func (m *MockPageReader) Reset(io.ReadSeeker, int64, compress.Compression, *file.CryptoContext) {
+func (m *MockPageReader) Reset(parquet.BufferedReader, int64, compress.Compression, *file.CryptoContext) {
 }
 
 func (m *MockPageReader) SetMaxPageHeaderSize(int) {}
diff --git a/go/parquet/reader_properties.go b/go/parquet/reader_properties.go
index cba9cd07ca..1f645b16d9 100644
--- a/go/parquet/reader_properties.go
+++ b/go/parquet/reader_properties.go
@@ -22,12 +22,16 @@ import (
 	"io"
 
 	"github.com/apache/arrow/go/v8/arrow/memory"
+	"github.com/apache/arrow/go/v8/internal/utils"
 )
 
 // ReaderProperties are used to define how the file reader will handle buffering and allocating buffers
 type ReaderProperties struct {
 	alloc memory.Allocator
-	// Default buffer size to utilize when reading chunks
+	// Default buffer size to utilize when reading chunks, when reading page
+	// headers or other metadata, this buffer may be increased if necessary
+	// to read in the necessary metadata. The value here is simply the default
+	// initial BufferSize when reading a new chunk.
 	BufferSize int64
 	// create with NewFileDecryptionProperties if dealing with an encrypted file
 	FileDecryptProps *FileDecryptionProperties
@@ -43,6 +47,12 @@ type ReaderProperties struct {
 	BufferedStreamEnabled bool
 }
 
+type BufferedReader interface {
+	Peek(int) ([]byte, error)
+	Discard(int) (int, error)
+	io.Reader
+}
+
 // NewReaderProperties returns the default Reader Properties using the provided allocator.
 //
 // If nil is passed for the allocator, then memory.DefaultAllocator will be used.
@@ -60,9 +70,9 @@ func (r *ReaderProperties) Allocator() memory.Allocator { return r.alloc }
 //
 // If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise it will read the entire section
 // into a buffer in memory and return a bytes.NewReader for that buffer.
-func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (io.ReadSeeker, error) {
+func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (BufferedReader, error) {
 	if r.BufferedStreamEnabled {
-		return io.NewSectionReader(source, start, nbytes), nil
+		return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize)), nil
 	}
 
 	data := make([]byte, nbytes)
@@ -74,5 +84,5 @@ func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (i
 		return nil, fmt.Errorf("parquet: tried reading %d bytes starting at position %d from file but only got %d", nbytes, start, n)
 	}
 
-	return bytes.NewReader(data), nil
+	return utils.NewBufferedReader(bytes.NewReader(data), int(nbytes)), nil
 }