You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/04/25 17:28:37 UTC

[arrow] branch master updated: ARROW-16283: [Go] Cleanup panics in new Buffered Reader

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d85e9024b5 ARROW-16283: [Go] Cleanup panics in new Buffered Reader
d85e9024b5 is described below

commit d85e9024b52e131f8dacde064962d53b04697a6f
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Mon Apr 25 13:30:21 2022 -0400

    ARROW-16283: [Go] Cleanup panics in new Buffered Reader
    
    Closes #12960 from zeroshade/arrow-16823-cleanup-panics
    
    Authored-by: Matthew Topol <mt...@factset.com>
    Signed-off-by: Matthew Topol <mt...@factset.com>
---
 go/internal/utils/buf_reader.go | 31 ++++++++++++++++++++++++-------
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git a/go/internal/utils/buf_reader.go b/go/internal/utils/buf_reader.go
index cc53ae6570..0b2381da1c 100644
--- a/go/internal/utils/buf_reader.go
+++ b/go/internal/utils/buf_reader.go
@@ -17,7 +17,9 @@
 package utils
 
 import (
+	"bufio"
 	"errors"
+	"fmt"
 	"io"
 )
 
@@ -32,7 +34,12 @@ type bufferedReader struct {
 	err      error
 }
 
+// NewBufferedReader returns a buffered reader with similar semantics to bufio.Reader
+// except Peek will expand the internal buffer if needed rather than return
+// an error.
 func NewBufferedReader(rd io.Reader, sz int) *bufferedReader {
+	// if rd is already a buffered reader whose buffer is >= the requested size
+	// then just return it as is. no need to make a new object.
 	b, ok := rd.(*bufferedReader)
 	if ok && len(b.buf) >= sz {
 		return b
@@ -62,7 +69,7 @@ func (b *bufferedReader) resizeBuffer(newSize int) {
 	b.resetBuffer()
 }
 
-func (b *bufferedReader) fill() {
+func (b *bufferedReader) fill() error {
 	// slide existing data to the beginning
 	if b.r > 0 {
 		copy(b.buf, b.buf[b.r:b.w])
@@ -71,16 +78,17 @@ func (b *bufferedReader) fill() {
 	}
 
 	if b.w >= len(b.buf) {
-		panic("parquet/bufio: tried to fill full buffer")
+		return fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrBufferFull)
 	}
 
 	n, err := io.ReadAtLeast(b.rd, b.buf[b.w:], 1)
 	if n < 0 {
-		panic("negative read")
+		return fmt.Errorf("arrow/bufferedreader: filling buffer: %w", bufio.ErrNegativeCount)
 	}
 
 	b.w += n
 	b.err = err
+	return nil
 }
 
 func (b *bufferedReader) readErr() error {
@@ -89,8 +97,12 @@ func (b *bufferedReader) readErr() error {
 	return err
 }
 
+// Buffered returns the number of bytes currently buffered
 func (b *bufferedReader) Buffered() int { return b.w - b.r }
 
+// SetBufferSize resets the size of the internal buffer to the desired size.
+// Will return an error if newSize is <= 0 or if newSize is less than the size
+// of the buffered data.
 func (b *bufferedReader) SetBufferSize(newSize int) error {
 	if newSize <= 0 {
 		return errors.New("buffer size should be positive")
@@ -104,9 +116,12 @@ func (b *bufferedReader) SetBufferSize(newSize int) error {
 	return nil
 }
 
+// Peek will buffer and return n bytes from the underlying reader without advancing
+// the reader itself. If n is larger than the current buffer size, the buffer will
+// be expanded to accommodate the extra bytes rather than error.
 func (b *bufferedReader) Peek(n int) ([]byte, error) {
 	if n < 0 {
-		return nil, errors.New("parquet/bufio: negative count")
+		return nil, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
 	}
 
 	if n > len(b.buf) {
@@ -122,9 +137,11 @@ func (b *bufferedReader) Peek(n int) ([]byte, error) {
 	return b.buf[b.r : b.r+n], b.readErr()
 }
 
+// Discard skips the next n bytes either by advancing the internal buffer
+// or by reading that many bytes in and throwing them away.
 func (b *bufferedReader) Discard(n int) (discarded int, err error) {
 	if n < 0 {
-		return 0, errors.New("negative count")
+		return 0, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
 	}
 
 	if n == 0 {
@@ -170,7 +187,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) {
 			// read directly into p to avoid extra copy
 			n, b.err = b.rd.Read(p)
 			if n < 0 {
-				panic("negative read")
+				return n, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
 			}
 			return n, b.readErr()
 		}
@@ -180,7 +197,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) {
 		b.r, b.w = 0, 0
 		n, b.err = b.rd.Read(b.buf)
 		if n < 0 {
-			panic("negative read")
+			return n, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
 		}
 		if n == 0 {
 			return 0, b.readErr()