You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/05/15 03:39:55 UTC

[pulsar-client-go] branch master updated: feature: add internal connectionReader readAtLeast error information (#237)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b04a842  feature: add internal connectionReader readAtLeast error information (#237)
b04a842 is described below

commit b04a842233c4cb9f1845ff06b721cac15c7b5e10
Author: SkyWalker <39...@users.noreply.github.com>
AuthorDate: Fri May 15 11:39:47 2020 +0800

    feature: add internal connectionReader readAtLeast error information (#237)
    
    add internal connectionReader readAtLeast error information
    these error information may help to solve #200
---
 pulsar/internal/connection_reader.go | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index 0485774..10ad97b 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -66,8 +66,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
 			// If the buffer is empty, just go back to write at the beginning
 			r.buffer.Clear()
 		}
-		if !r.readAtLeast(4) {
-			return nil, nil, errors.New("Short read when reading frame size")
+		if err := r.readAtLeast(4); err != nil {
+			return nil, nil, errors.Errorf("Short read when reading frame size: %s", err)
 		}
 	}
 
@@ -82,8 +82,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
 	// Next, we read the rest of the frame
 	if r.buffer.ReadableBytes() < frameSize {
 		remainingBytes := frameSize - r.buffer.ReadableBytes()
-		if !r.readAtLeast(remainingBytes) {
-			return nil, nil, errors.New("Short read when reading frame")
+		if err := r.readAtLeast(remainingBytes); err != nil {
+			return nil, nil, errors.Errorf("Short read when reading frame: %s", err)
 		}
 	}
 
@@ -103,7 +103,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
 	return cmd, headersAndPayload, nil
 }
 
-func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
+func (r *connectionReader) readAtLeast(size uint32) error {
 	if r.buffer.WritableBytes() < size {
 		// There's not enough room in the current buffer to read the requested amount of data
 		totalFrameSize := r.buffer.ReadableBytes() + size
@@ -120,11 +120,11 @@ func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
 	n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
 	if err != nil {
 		r.cnx.TriggerClose()
-		return false
+		return err
 	}
 
 	r.buffer.WrittenBytes(uint32(n))
-	return true
+	return nil
 }
 
 func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) {