You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/04/25 17:28:37 UTC
[arrow] branch master updated: ARROW-16283: [Go] Cleanup panics in new Buffered Reader
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d85e9024b5 ARROW-16283: [Go] Cleanup panics in new Buffered Reader
d85e9024b5 is described below
commit d85e9024b52e131f8dacde064962d53b04697a6f
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Mon Apr 25 13:30:21 2022 -0400
ARROW-16283: [Go] Cleanup panics in new Buffered Reader
Closes #12960 from zeroshade/arrow-16823-cleanup-panics
Authored-by: Matthew Topol <mt...@factset.com>
Signed-off-by: Matthew Topol <mt...@factset.com>
---
go/internal/utils/buf_reader.go | 31 ++++++++++++++++++++++++-------
1 file changed, 24 insertions(+), 7 deletions(-)
diff --git a/go/internal/utils/buf_reader.go b/go/internal/utils/buf_reader.go
index cc53ae6570..0b2381da1c 100644
--- a/go/internal/utils/buf_reader.go
+++ b/go/internal/utils/buf_reader.go
@@ -17,7 +17,9 @@
package utils
import (
+ "bufio"
"errors"
+ "fmt"
"io"
)
@@ -32,7 +34,12 @@ type bufferedReader struct {
err error
}
+// NewBufferedReader returns a buffered reader with similar semantics to bufio.Reader
+// except Peek will expand the internal buffer if needed rather than return
+// an error.
func NewBufferedReader(rd io.Reader, sz int) *bufferedReader {
+ // if rd is already a buffered reader whose buffer is >= the requested size
+ // then just return it as is. no need to make a new object.
b, ok := rd.(*bufferedReader)
if ok && len(b.buf) >= sz {
return b
@@ -62,7 +69,7 @@ func (b *bufferedReader) resizeBuffer(newSize int) {
b.resetBuffer()
}
-func (b *bufferedReader) fill() {
+func (b *bufferedReader) fill() error {
// slide existing data to the beginning
if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w])
@@ -71,16 +78,17 @@ func (b *bufferedReader) fill() {
}
if b.w >= len(b.buf) {
- panic("parquet/bufio: tried to fill full buffer")
+ return fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrBufferFull)
}
n, err := io.ReadAtLeast(b.rd, b.buf[b.w:], 1)
if n < 0 {
- panic("negative read")
+ return fmt.Errorf("arrow/bufferedreader: filling buffer: %w", bufio.ErrNegativeCount)
}
b.w += n
b.err = err
+ return nil
}
func (b *bufferedReader) readErr() error {
@@ -89,8 +97,12 @@ func (b *bufferedReader) readErr() error {
return err
}
+// Buffered returns the number of bytes currently buffered
func (b *bufferedReader) Buffered() int { return b.w - b.r }
+// SetBufferSize resets the size of the internal buffer to the desired size.
+// Will return an error if newSize is <= 0 or if newSize is less than the size
+// of the buffered data.
func (b *bufferedReader) SetBufferSize(newSize int) error {
if newSize <= 0 {
return errors.New("buffer size should be positive")
@@ -104,9 +116,12 @@ func (b *bufferedReader) SetBufferSize(newSize int) error {
return nil
}
+// Peek will buffer and return n bytes from the underlying reader without advancing
+// the reader itself. If n is larger than the current buffer size, the buffer will
+// be expanded to accommodate the extra bytes rather than error.
func (b *bufferedReader) Peek(n int) ([]byte, error) {
if n < 0 {
- return nil, errors.New("parquet/bufio: negative count")
+ return nil, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
}
if n > len(b.buf) {
@@ -122,9 +137,11 @@ func (b *bufferedReader) Peek(n int) ([]byte, error) {
return b.buf[b.r : b.r+n], b.readErr()
}
+// Discard skips the next n bytes either by advancing the internal buffer
+// or by reading that many bytes in and throwing them away.
func (b *bufferedReader) Discard(n int) (discarded int, err error) {
if n < 0 {
- return 0, errors.New("negative count")
+ return 0, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
}
if n == 0 {
@@ -170,7 +187,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) {
// read directly into p to avoid extra copy
n, b.err = b.rd.Read(p)
if n < 0 {
- panic("negative read")
+ return n, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
}
return n, b.readErr()
}
@@ -180,7 +197,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) {
b.r, b.w = 0, 0
n, b.err = b.rd.Read(b.buf)
if n < 0 {
- panic("negative read")
+ return n, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
}
if n == 0 {
return 0, b.readErr()