You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:49 UTC

[pulsar-client-go] 12/38: Queue sending requests and trigger callbacks

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 7851714fb75ab9e62f50f064f9db948d6956f890
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 14:26:11 2019 -0700

    Queue sending requests and trigger callbacks
---
 pulsar/impl/batch_builder.go      | 15 ++++++---
 pulsar/impl/connection.go         | 25 +++++++++++----
 pulsar/impl/util/semaphore.go     | 11 +++++++
 pulsar/impl_partition_producer.go | 65 +++++++++++++++++++++++++++++++--------
 pulsar/message.go                 |  2 +-
 5 files changed, 95 insertions(+), 23 deletions(-)

diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 53bf081..7a16155 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -27,6 +27,7 @@ type BatchBuilder struct {
 
 	cmdSend     *pb.BaseCommand
 	msgMetadata *pb.MessageMetadata
+	callbacks   []interface{}
 }
 
 func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *BatchBuilder {
@@ -46,6 +47,7 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *
 		msgMetadata: &pb.MessageMetadata{
 			ProducerName: &producerName,
 		},
+		callbacks: []interface{}{},
 	}
 }
 
@@ -58,7 +60,8 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 	return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
 }
 
-func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte) bool {
+func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte,
+	callback interface{}) bool {
 	if bb.hasSpace(payload) {
 		// The current batch is full. Producer has to call Flush() to
 		return false
@@ -75,19 +78,21 @@ func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint6
 	serializeSingleMessage(bb.buffer, metadata, payload)
 
 	bb.numMessages += 1
+	bb.callbacks = append(bb.callbacks, callback)
 	return true
 }
 
 func (bb *BatchBuilder) reset() {
 	bb.numMessages = 0
 	bb.buffer.Clear()
+	bb.callbacks = []interface{}{}
 }
 
-func (bb *BatchBuilder) Flush() []byte {
+func (bb *BatchBuilder) Flush() (batchData []byte, sequenceId uint64, callbacks []interface{}) {
 	log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
 	if bb.numMessages == 0 {
 		// No-Op for empty batch
-		return nil
+		return nil, 0, nil
 	}
 
 	bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
@@ -96,6 +101,8 @@ func (bb *BatchBuilder) Flush() []byte {
 	buffer := NewBuffer(4096)
 	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer.ReadableSlice())
 
+	callbacks = bb.callbacks
+	sequenceId = bb.cmdSend.Send.GetSequenceId()
 	bb.reset()
-	return buffer.ReadableSlice()
+	return buffer.ReadableSlice(), sequenceId, callbacks
 }
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 18a3b63..fa32889 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -16,13 +16,15 @@ import (
 // a consumer) that can register itself to get notified
 // when the connection is closed.
 type ConnectionListener interface {
+	ReceivedSendReceipt(response *pb.CommandSendReceipt)
+
 	ConnectionClosed()
 }
 
 type Connection interface {
 	SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
 	WriteData(data []byte)
-	RegisterListener(listener ConnectionListener)
+	RegisterListener(id uint64, listener ConnectionListener)
 	Close()
 }
 
@@ -65,7 +67,7 @@ type connection struct {
 	incomingRequests chan *request
 	writeRequests    chan []byte
 	pendingReqs      map[uint64]*request
-	listeners        []ConnectionListener
+	listeners        map[uint64]ConnectionListener
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
@@ -81,7 +83,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
 
 		incomingRequests: make(chan *request),
 		writeRequests:    make(chan []byte),
-		listeners:        make([]ConnectionListener, 0),
+		listeners:        make(map[uint64]ConnectionListener),
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -263,8 +265,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 	case pb.BaseCommand_ERROR:
 	case pb.BaseCommand_CLOSE_PRODUCER:
 	case pb.BaseCommand_CLOSE_CONSUMER:
+
 	case pb.BaseCommand_SEND_RECEIPT:
-		c.log.Info("Got SEND_RECEIPT: ", cmd.GetSendReceipt())
+		c.handleSendReceipt(cmd.GetSendReceipt())
 
 	case pb.BaseCommand_SEND_ERROR:
 
@@ -310,6 +313,16 @@ func (c *connection) handleResponse(requestId uint64, response *pb.BaseCommand)
 	request.callback(response)
 }
 
+func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
+	c.log.Debug("Got SEND_RECEIPT: ", response)
+	producerId := response.GetProducerId()
+	if producer, ok := c.listeners[producerId]; ok {
+		producer.ReceivedSendReceipt(response)
+	} else {
+		c.log.WithField("producerId", producerId).Warn("Got unexpected send receipt for message: ", response.MessageId)
+	}
+}
+
 func (c *connection) sendPing() {
 	if c.lastDataReceivedTime.Add(2 * keepAliveInterval).Before(time.Now()) {
 		// We have not received a response to the previous Ping request, the
@@ -331,11 +344,11 @@ func (c *connection) handlePing() {
 	c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
 }
 
-func (c *connection) RegisterListener(listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
 	c.Lock()
 	defer c.Unlock()
 
-	c.listeners = append(c.listeners, listener)
+	c.listeners[id] = listener
 }
 
 func (c *connection) Close() {
diff --git a/pulsar/impl/util/semaphore.go b/pulsar/impl/util/semaphore.go
new file mode 100644
index 0000000..9df8cd1
--- /dev/null
+++ b/pulsar/impl/util/semaphore.go
@@ -0,0 +1,11 @@
+package util
+
+type Semaphore chan bool
+
+func (s Semaphore) Acquire() {
+	s <- true
+}
+
+func (s Semaphore) Release() {
+	<-s
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 60ae76c..cdd34d5 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -5,6 +5,7 @@ import (
 	"github.com/golang/protobuf/proto"
 	log "github.com/sirupsen/logrus"
 	"pulsar-client-go-native/pulsar/impl"
+	"pulsar-client-go-native/pulsar/impl/util"
 	pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"sync"
 	"time"
@@ -27,6 +28,9 @@ type partitionProducer struct {
 
 	// Channel where app is posting messages to be published
 	eventsChan chan interface{}
+
+	publishSemaphore util.Semaphore
+	pendingQueue     util.BlockingQueue
 }
 
 const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -40,6 +44,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
 	}
 
+	var maxPendingMessages int
+	if options.MaxPendingMessages == 0 {
+		maxPendingMessages = 1000
+	} else {
+		maxPendingMessages = options.MaxPendingMessages
+	}
+
 	p := &partitionProducer{
 		log:              log.WithField("topic", topic),
 		client:           client,
@@ -48,6 +59,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		producerId:       client.rpcClient.NewProducerId(),
 		eventsChan:       make(chan interface{}),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
+		publishSemaphore: make(util.Semaphore, maxPendingMessages),
+		pendingQueue:     util.NewBlockingQueue(maxPendingMessages),
 	}
 
 	if options.Name != "" {
@@ -61,7 +74,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	} else {
 		p.log = p.log.WithField("name", *p.producerName)
 		p.log.Info("Created producer")
-		go p.run()
+		go p.runEventsLoop()
 		return p, nil
 	}
 }
@@ -99,7 +112,7 @@ func (p *partitionProducer) grabCnx() error {
 		p.sequenceIdGenerator = &nextSequenceId
 	}
 	p.cnx = res.Cnx
-	p.cnx.RegisterListener(p)
+	p.cnx.RegisterListener(p.producerId, p)
 	p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
 	return nil
 }
@@ -128,8 +141,7 @@ func (p *partitionProducer) reconnectToBroker() {
 	}
 }
 
-
-func (p *partitionProducer) run() {
+func (p *partitionProducer) runEventsLoop() {
 	for {
 		select {
 		case i := <-p.eventsChan:
@@ -177,7 +189,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 
 		sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
-		for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload) == false; {
+		for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload, request) == false; {
 			// The current batch is full.. flush it and retry
 			p.internalFlush()
 		}
@@ -186,12 +198,19 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+type pendingItem struct {
+	batchData   []byte
+	sequenceId  uint64
+	sendRequest []interface{}
+}
+
 func (p *partitionProducer) internalFlush() {
-	batchData := p.batchBuilder.Flush()
+	batchData, sequenceId, callbacks := p.batchBuilder.Flush()
 	if batchData == nil {
 		return
 	}
 
+	p.pendingQueue.Put(&pendingItem{batchData, sequenceId, callbacks})
 	p.cnx.WriteData(batchData)
 }
 
@@ -216,14 +235,30 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
 	return err
 }
 
-type sendRequest struct {
-	ctx      context.Context
-	msg      *ProducerMessage
-	callback func(MessageID, *ProducerMessage, error)
+func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
+	callback func(MessageID, *ProducerMessage, error)) {
+	p.publishSemaphore.Acquire()
+	p.eventsChan <- &sendRequest{ctx, msg, callback}
 }
 
-func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) {
-	p.eventsChan <- &sendRequest{ctx, msg, callback}
+func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
+	pi := p.pendingQueue.Peek().(*pendingItem)
+
+	if pi == nil {
+		p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
+		return
+	} else if pi.sequenceId != response.GetSequenceId() {
+		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
+			response.GetSequenceId(), pi.sequenceId)
+		return
+	}
+
+	// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
+	p.pendingQueue.Poll()
+	for _ ,i := range pi.sendRequest {
+		sr := i.(*sendRequest)
+		sr.callback(nil, sr.msg, nil)
+	}
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {
@@ -239,3 +274,9 @@ func (p *partitionProducer) Close() error {
 	p.log.Info("Closing producer")
 	return nil
 }
+
+type sendRequest struct {
+	ctx      context.Context
+	msg      *ProducerMessage
+	callback func(MessageID, *ProducerMessage, error)
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index 12bc269..e987ad1 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -82,7 +82,7 @@ func DeserializeMessageID(data []byte) MessageID {
 }
 
 var (
-// MessageID that points to the earliest message avaialable in a topic
+// MessageID that points to the earliest message available in a topic
 // TODO: EarliestMessage MessageID = earliestMessageID()
 
 // MessageID that points to the latest message