You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/10/24 10:14:03 UTC

[pulsar-client-go] branch master updated: fix: avoid assert panic (#73)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2ff05  fix: avoid assert panic (#73)
8a2ff05 is described below

commit 8a2ff054d85bcbdba433b18639745920bbabc64c
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Thu Oct 24 18:09:44 2019 +0800

    fix: avoid assert panic (#73)
    
    - add assert check
    
    Fixes #64
    
    Change-Id: Ibd355440ccf3b06ac60575a9306cfb66cb80d759
---
 pulsar/impl_partition_producer.go | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 07e2da0..f09cd42 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -23,10 +23,11 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/golang/protobuf/proto"
+
 	"github.com/apache/pulsar-client-go/pkg/pb"
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/apache/pulsar-client-go/util"
-	"github.com/golang/protobuf/proto"
 
 	log "github.com/sirupsen/logrus"
 )
@@ -293,7 +294,12 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
 	p.internalFlushCurrentBatch()
 
-	pi := p.pendingQueue.PeekLast().(*pendingItem)
+	pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
+	if !ok {
+		fr.waitGroup.Done()
+		return
+	}
+
 	pi.sendRequests = append(pi.sendRequests, &sendRequest{
 		msg: nil,
 		callback: func(id MessageID, message *ProducerMessage, e error) {
@@ -349,12 +355,14 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
-	pi := p.pendingQueue.Peek().(*pendingItem)
+	pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
-	if pi == nil {
+	if !ok {
 		p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
 		return
-	} else if pi.sequenceID != response.GetSequenceId() {
+	}
+
+	if pi.sequenceID != response.GetSequenceId() {
 		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
 			response.GetSequenceId(), pi.sequenceID)
 		return