You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/10/24 10:14:03 UTC
[pulsar-client-go] branch master updated: fix: avoid assert panic
(#73)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8a2ff05 fix: avoid assert panic (#73)
8a2ff05 is described below
commit 8a2ff054d85bcbdba433b18639745920bbabc64c
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Thu Oct 24 18:09:44 2019 +0800
fix: avoid assert panic (#73)
- add assert check
Fixes #64
Change-Id: Ibd355440ccf3b06ac60575a9306cfb66cb80d759
---
pulsar/impl_partition_producer.go | 18 +++++++++++++-----
1 file changed, 13 insertions(+), 5 deletions(-)
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 07e2da0..f09cd42 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -23,10 +23,11 @@ import (
"sync/atomic"
"time"
+ "github.com/golang/protobuf/proto"
+
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/util"
- "github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
@@ -293,7 +294,12 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
func (p *partitionProducer) internalFlush(fr *flushRequest) {
p.internalFlushCurrentBatch()
- pi := p.pendingQueue.PeekLast().(*pendingItem)
+ pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
+ if !ok {
+ fr.waitGroup.Done()
+ return
+ }
+
pi.sendRequests = append(pi.sendRequests, &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
@@ -349,12 +355,14 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
}
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
- pi := p.pendingQueue.Peek().(*pendingItem)
+ pi, ok := p.pendingQueue.Peek().(*pendingItem)
- if pi == nil {
+ if !ok {
p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
return
- } else if pi.sequenceID != response.GetSequenceId() {
+ }
+
+ if pi.sequenceID != response.GetSequenceId() {
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
return