You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dc...@apache.org on 2020/06/08 11:32:28 UTC

[thrift] branch master updated: THRIFT-5214: Partial rewrite of TFramedTransport

This is an automated email from the ASF dual-hosted git repository.

dcelasun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new c9890cb  THRIFT-5214: Partial rewrite of TFramedTransport
c9890cb is described below

commit c9890cb873f127137abd513a7ebdf0186ee8c88c
Author: Yuxuan 'fishy' Wang <yu...@reddit.com>
AuthorDate: Mon Jun 8 04:32:21 2020 -0700

    THRIFT-5214: Partial rewrite of TFramedTransport
    
    Client: go
    
    While debugging the issue in THRIFT-5214, I original thought that was a
    bug in TFramedTransport implementation, so I took some time scrutinize
    the TFramedTransport code. Although in the end there's no bug, the
    current implementation of TFramedTransport, especially in the Read
    function, has some weird handling while at frame boundary, which is both
    error-prone and hard to follow (I did found and fixed one bug there in
    https://github.com/apache/thrift/pull/1810 before).
    
    The new implementation reads the whole frame into a buffer, which would
    use slightly more memory, but it follows the pattern of TFramedTransport
    implementation of other languages (e.g. Java), and also the pattern we
    handle frame in THeaderTransport. It also reduced the complexity and
    weirdness of the frame boundary handling in Read implementation.
    
    This rewrite also removes the print call from library code.
---
 lib/go/thrift/framed_transport.go | 108 ++++++++++++++++----------------------
 1 file changed, 44 insertions(+), 64 deletions(-)

diff --git a/lib/go/thrift/framed_transport.go b/lib/go/thrift/framed_transport.go
index 34275b5..f192075 100644
--- a/lib/go/thrift/framed_transport.go
+++ b/lib/go/thrift/framed_transport.go
@@ -32,11 +32,14 @@ const DEFAULT_MAX_LENGTH = 16384000
 
 type TFramedTransport struct {
 	transport TTransport
-	buf       bytes.Buffer
-	reader    *bufio.Reader
-	frameSize uint32 //Current remaining size of the frame. if ==0 read next frame header
-	buffer    [4]byte
 	maxLength uint32
+
+	writeBuf bytes.Buffer
+
+	reader  *bufio.Reader
+	readBuf bytes.Buffer
+
+	buffer [4]byte
 }
 
 type tFramedTransportFactory struct {
@@ -80,89 +83,65 @@ func (p *TFramedTransport) Close() error {
 	return p.transport.Close()
 }
 
-func (p *TFramedTransport) Read(buf []byte) (l int, err error) {
-	if p.frameSize == 0 {
-		p.frameSize, err = p.readFrameHeader()
-		if err != nil {
-			return
-		}
+func (p *TFramedTransport) Read(buf []byte) (read int, err error) {
+	read, err = p.readBuf.Read(buf)
+	if err != io.EOF {
+		return
 	}
-	if p.frameSize < uint32(len(buf)) {
-		frameSize := p.frameSize
-		tmp := make([]byte, p.frameSize)
-		l, err = p.Read(tmp)
-		copy(buf, tmp)
-		if err == nil {
-			// Note: It's important to only return an error when l
-			// is zero.
-			// In io.Reader.Read interface, it's perfectly fine to
-			// return partial data and nil error, which means
-			// "This is all the data we have right now without
-			// blocking. If you need the full data, call Read again
-			// or use io.ReadFull instead".
-			// Returning partial data with an error actually means
-			// there's no more data after the partial data just
-			// returned, which is not true in this case
-			// (it might be that the other end just haven't written
-			// them yet).
-			if l == 0 {
-				err = NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", frameSize, len(buf)))
-			}
-			return
-		}
+
+	// For bytes.Buffer.Read, EOF would only happen when read is zero,
+	// but still, do a sanity check,
+	// in case that behavior is changed in a future version of go stdlib.
+	// When that happens, just return nil error,
+	// and let the caller call Read again to read the next frame.
+	if read > 0 {
+		return read, nil
 	}
-	got, err := p.reader.Read(buf)
-	p.frameSize = p.frameSize - uint32(got)
-	//sanity check
-	if p.frameSize < 0 {
-		return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Negative frame size")
+
+	// Reaching here means that the last Read finished the last frame,
+	// so we need to read the next frame into readBuf now.
+	if err = p.readFrame(); err != nil {
+		return read, err
 	}
-	return got, NewTTransportExceptionFromError(err)
+	newRead, err := p.Read(buf[read:])
+	return read + newRead, err
 }
 
 func (p *TFramedTransport) ReadByte() (c byte, err error) {
-	if p.frameSize == 0 {
-		p.frameSize, err = p.readFrameHeader()
-		if err != nil {
-			return
-		}
-	}
-	if p.frameSize < 1 {
-		return 0, NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", p.frameSize, 1))
-	}
-	c, err = p.reader.ReadByte()
-	if err == nil {
-		p.frameSize--
+	buf := p.buffer[:1]
+	_, err = p.Read(buf)
+	if err != nil {
+		return
 	}
+	c = buf[0]
 	return
 }
 
 func (p *TFramedTransport) Write(buf []byte) (int, error) {
-	n, err := p.buf.Write(buf)
+	n, err := p.writeBuf.Write(buf)
 	return n, NewTTransportExceptionFromError(err)
 }
 
 func (p *TFramedTransport) WriteByte(c byte) error {
-	return p.buf.WriteByte(c)
+	return p.writeBuf.WriteByte(c)
 }
 
 func (p *TFramedTransport) WriteString(s string) (n int, err error) {
-	return p.buf.WriteString(s)
+	return p.writeBuf.WriteString(s)
 }
 
 func (p *TFramedTransport) Flush(ctx context.Context) error {
-	size := p.buf.Len()
+	size := p.writeBuf.Len()
 	buf := p.buffer[:4]
 	binary.BigEndian.PutUint32(buf, uint32(size))
 	_, err := p.transport.Write(buf)
 	if err != nil {
-		p.buf.Truncate(0)
+		p.writeBuf.Reset()
 		return NewTTransportExceptionFromError(err)
 	}
 	if size > 0 {
-		if n, err := p.buf.WriteTo(p.transport); err != nil {
-			print("Error while flushing write buffer of size ", size, " to transport, only wrote ", n, " bytes: ", err.Error(), "\n")
-			p.buf.Truncate(0)
+		if _, err := io.Copy(p.transport, &p.writeBuf); err != nil {
+			p.writeBuf.Reset()
 			return NewTTransportExceptionFromError(err)
 		}
 	}
@@ -170,18 +149,19 @@ func (p *TFramedTransport) Flush(ctx context.Context) error {
 	return NewTTransportExceptionFromError(err)
 }
 
-func (p *TFramedTransport) readFrameHeader() (uint32, error) {
+func (p *TFramedTransport) readFrame() error {
 	buf := p.buffer[:4]
 	if _, err := io.ReadFull(p.reader, buf); err != nil {
-		return 0, err
+		return err
 	}
 	size := binary.BigEndian.Uint32(buf)
 	if size < 0 || size > p.maxLength {
-		return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size))
+		return NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size))
 	}
-	return size, nil
+	_, err := io.CopyN(&p.readBuf, p.reader, int64(size))
+	return NewTTransportExceptionFromError(err)
 }
 
 func (p *TFramedTransport) RemainingBytes() (num_bytes uint64) {
-	return uint64(p.frameSize)
+	return uint64(p.readBuf.Len())
 }