You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/05/15 03:39:55 UTC
[pulsar-client-go] branch master updated: feature: add internal
connectionReader readAtLeast error information (#237)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b04a842 feature: add internal connectionReader readAtLeast error information (#237)
b04a842 is described below
commit b04a842233c4cb9f1845ff06b721cac15c7b5e10
Author: SkyWalker <39...@users.noreply.github.com>
AuthorDate: Fri May 15 11:39:47 2020 +0800
feature: add internal connectionReader readAtLeast error information (#237)
add internal connectionReader readAtLeast error information
these error information may help to solve #200
---
pulsar/internal/connection_reader.go | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index 0485774..10ad97b 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -66,8 +66,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
// If the buffer is empty, just go back to write at the beginning
r.buffer.Clear()
}
- if !r.readAtLeast(4) {
- return nil, nil, errors.New("Short read when reading frame size")
+ if err := r.readAtLeast(4); err != nil {
+ return nil, nil, errors.Errorf("Short read when reading frame size: %s", err)
}
}
@@ -82,8 +82,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
// Next, we read the rest of the frame
if r.buffer.ReadableBytes() < frameSize {
remainingBytes := frameSize - r.buffer.ReadableBytes()
- if !r.readAtLeast(remainingBytes) {
- return nil, nil, errors.New("Short read when reading frame")
+ if err := r.readAtLeast(remainingBytes); err != nil {
+ return nil, nil, errors.Errorf("Short read when reading frame: %s", err)
}
}
@@ -103,7 +103,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
return cmd, headersAndPayload, nil
}
-func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
+func (r *connectionReader) readAtLeast(size uint32) error {
if r.buffer.WritableBytes() < size {
// There's not enough room in the current buffer to read the requested amount of data
totalFrameSize := r.buffer.ReadableBytes() + size
@@ -120,11 +120,11 @@ func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
if err != nil {
r.cnx.TriggerClose()
- return false
+ return err
}
r.buffer.WrittenBytes(uint32(n))
- return true
+ return nil
}
func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) {