You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/05/06 02:32:17 UTC
[pulsar-client-go] branch master updated: allow empty payload for
nonbatch message (#236)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e7f1673 allow empty payload for nonbatch message (#236)
e7f1673 is described below
commit e7f1673350f208b5063823282d14906d70d66904
Author: Ming <it...@gmail.com>
AuthorDate: Tue May 5 22:32:05 2020 -0400
allow empty payload for nonbatch message (#236)
print out error message from MessageReceived
---
pulsar/internal/commands.go | 2 +-
pulsar/internal/connection.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 8798443..d9f2a1f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -114,7 +114,7 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
}
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
- if r.buffer.ReadableBytes() == 0 {
+ if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
return nil, nil, ErrEOM
}
if !r.batched {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index dace305..d02deda 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -527,7 +527,7 @@ func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer)
if consumer, ok := c.consumerHandler(consumerID); ok {
err := consumer.MessageReceived(response, payload)
if err != nil {
- c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId)
+ c.log.WithField("consumerID", consumerID).WithError(err).Error("handle message Id: ", response.MessageId)
}
} else {
c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId)