You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:49 UTC
[pulsar-client-go] 12/38: Queue sending requests and trigger
callbacks
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 7851714fb75ab9e62f50f064f9db948d6956f890
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 14:26:11 2019 -0700
Queue sending requests and trigger callbacks
---
pulsar/impl/batch_builder.go | 15 ++++++---
pulsar/impl/connection.go | 25 +++++++++++----
pulsar/impl/util/semaphore.go | 11 +++++++
pulsar/impl_partition_producer.go | 65 +++++++++++++++++++++++++++++++--------
pulsar/message.go | 2 +-
5 files changed, 95 insertions(+), 23 deletions(-)
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 53bf081..7a16155 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -27,6 +27,7 @@ type BatchBuilder struct {
cmdSend *pb.BaseCommand
msgMetadata *pb.MessageMetadata
+ callbacks []interface{}
}
func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *BatchBuilder {
@@ -46,6 +47,7 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *
msgMetadata: &pb.MessageMetadata{
ProducerName: &producerName,
},
+ callbacks: []interface{}{},
}
}
@@ -58,7 +60,8 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
}
-func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte) bool {
+func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte,
+ callback interface{}) bool {
if bb.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
@@ -75,19 +78,21 @@ func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint6
serializeSingleMessage(bb.buffer, metadata, payload)
bb.numMessages += 1
+ bb.callbacks = append(bb.callbacks, callback)
return true
}
func (bb *BatchBuilder) reset() {
bb.numMessages = 0
bb.buffer.Clear()
+ bb.callbacks = []interface{}{}
}
-func (bb *BatchBuilder) Flush() []byte {
+func (bb *BatchBuilder) Flush() (batchData []byte, sequenceId uint64, callbacks []interface{}) {
log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
if bb.numMessages == 0 {
// No-Op for empty batch
- return nil
+ return nil, 0, nil
}
bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
@@ -96,6 +101,8 @@ func (bb *BatchBuilder) Flush() []byte {
buffer := NewBuffer(4096)
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer.ReadableSlice())
+ callbacks = bb.callbacks
+ sequenceId = bb.cmdSend.Send.GetSequenceId()
bb.reset()
- return buffer.ReadableSlice()
+ return buffer.ReadableSlice(), sequenceId, callbacks
}
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 18a3b63..fa32889 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -16,13 +16,15 @@ import (
// a consumer) that can register itself to get notified
// when the connection is closed.
type ConnectionListener interface {
+ ReceivedSendReceipt(response *pb.CommandSendReceipt)
+
ConnectionClosed()
}
type Connection interface {
SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
WriteData(data []byte)
- RegisterListener(listener ConnectionListener)
+ RegisterListener(id uint64, listener ConnectionListener)
Close()
}
@@ -65,7 +67,7 @@ type connection struct {
incomingRequests chan *request
writeRequests chan []byte
pendingReqs map[uint64]*request
- listeners []ConnectionListener
+ listeners map[uint64]ConnectionListener
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
@@ -81,7 +83,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
incomingRequests: make(chan *request),
writeRequests: make(chan []byte),
- listeners: make([]ConnectionListener, 0),
+ listeners: make(map[uint64]ConnectionListener),
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -263,8 +265,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_ERROR:
case pb.BaseCommand_CLOSE_PRODUCER:
case pb.BaseCommand_CLOSE_CONSUMER:
+
case pb.BaseCommand_SEND_RECEIPT:
- c.log.Info("Got SEND_RECEIPT: ", cmd.GetSendReceipt())
+ c.handleSendReceipt(cmd.GetSendReceipt())
case pb.BaseCommand_SEND_ERROR:
@@ -310,6 +313,16 @@ func (c *connection) handleResponse(requestId uint64, response *pb.BaseCommand)
request.callback(response)
}
+func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
+ c.log.Debug("Got SEND_RECEIPT: ", response)
+ producerId := response.GetProducerId()
+ if producer, ok := c.listeners[producerId]; ok {
+ producer.ReceivedSendReceipt(response)
+ } else {
+ c.log.WithField("producerId", producerId).Warn("Got unexpected send receipt for message: ", response.MessageId)
+ }
+}
+
func (c *connection) sendPing() {
if c.lastDataReceivedTime.Add(2 * keepAliveInterval).Before(time.Now()) {
// We have not received a response to the previous Ping request, the
@@ -331,11 +344,11 @@ func (c *connection) handlePing() {
c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
}
-func (c *connection) RegisterListener(listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
c.Lock()
defer c.Unlock()
- c.listeners = append(c.listeners, listener)
+ c.listeners[id] = listener
}
func (c *connection) Close() {
diff --git a/pulsar/impl/util/semaphore.go b/pulsar/impl/util/semaphore.go
new file mode 100644
index 0000000..9df8cd1
--- /dev/null
+++ b/pulsar/impl/util/semaphore.go
@@ -0,0 +1,11 @@
+package util
+
+type Semaphore chan bool
+
+func (s Semaphore) Acquire() {
+ s <- true
+}
+
+func (s Semaphore) Release() {
+ <-s
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 60ae76c..cdd34d5 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -5,6 +5,7 @@ import (
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"pulsar-client-go-native/pulsar/impl"
+ "pulsar-client-go-native/pulsar/impl/util"
pb "pulsar-client-go-native/pulsar/pulsar_proto"
"sync"
"time"
@@ -27,6 +28,9 @@ type partitionProducer struct {
// Channel where app is posting messages to be published
eventsChan chan interface{}
+
+ publishSemaphore util.Semaphore
+ pendingQueue util.BlockingQueue
}
const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -40,6 +44,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
+ var maxPendingMessages int
+ if options.MaxPendingMessages == 0 {
+ maxPendingMessages = 1000
+ } else {
+ maxPendingMessages = options.MaxPendingMessages
+ }
+
p := &partitionProducer{
log: log.WithField("topic", topic),
client: client,
@@ -48,6 +59,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
producerId: client.rpcClient.NewProducerId(),
eventsChan: make(chan interface{}),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
+ publishSemaphore: make(util.Semaphore, maxPendingMessages),
+ pendingQueue: util.NewBlockingQueue(maxPendingMessages),
}
if options.Name != "" {
@@ -61,7 +74,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
} else {
p.log = p.log.WithField("name", *p.producerName)
p.log.Info("Created producer")
- go p.run()
+ go p.runEventsLoop()
return p, nil
}
}
@@ -99,7 +112,7 @@ func (p *partitionProducer) grabCnx() error {
p.sequenceIdGenerator = &nextSequenceId
}
p.cnx = res.Cnx
- p.cnx.RegisterListener(p)
+ p.cnx.RegisterListener(p.producerId, p)
p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
return nil
}
@@ -128,8 +141,7 @@ func (p *partitionProducer) reconnectToBroker() {
}
}
-
-func (p *partitionProducer) run() {
+func (p *partitionProducer) runEventsLoop() {
for {
select {
case i := <-p.eventsChan:
@@ -177,7 +189,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}
sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
- for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload) == false; {
+ for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload, request) == false; {
// The current batch is full.. flush it and retry
p.internalFlush()
}
@@ -186,12 +198,19 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}
}
+type pendingItem struct {
+ batchData []byte
+ sequenceId uint64
+ sendRequest []interface{}
+}
+
func (p *partitionProducer) internalFlush() {
- batchData := p.batchBuilder.Flush()
+ batchData, sequenceId, callbacks := p.batchBuilder.Flush()
if batchData == nil {
return
}
+ p.pendingQueue.Put(&pendingItem{batchData, sequenceId, callbacks})
p.cnx.WriteData(batchData)
}
@@ -216,14 +235,30 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
return err
}
-type sendRequest struct {
- ctx context.Context
- msg *ProducerMessage
- callback func(MessageID, *ProducerMessage, error)
+func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error)) {
+ p.publishSemaphore.Acquire()
+ p.eventsChan <- &sendRequest{ctx, msg, callback}
}
-func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) {
- p.eventsChan <- &sendRequest{ctx, msg, callback}
+func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
+ pi := p.pendingQueue.Peek().(*pendingItem)
+
+ if pi == nil {
+ p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
+ return
+ } else if pi.sequenceId != response.GetSequenceId() {
+ p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
+ response.GetSequenceId(), pi.sequenceId)
+ return
+ }
+
+ // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
+ p.pendingQueue.Poll()
+ for _ ,i := range pi.sendRequest {
+ sr := i.(*sendRequest)
+ sr.callback(nil, sr.msg, nil)
+ }
}
func (p *partitionProducer) LastSequenceID() int64 {
@@ -239,3 +274,9 @@ func (p *partitionProducer) Close() error {
p.log.Info("Closing producer")
return nil
}
+
+type sendRequest struct {
+ ctx context.Context
+ msg *ProducerMessage
+ callback func(MessageID, *ProducerMessage, error)
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index 12bc269..e987ad1 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -82,7 +82,7 @@ func DeserializeMessageID(data []byte) MessageID {
}
var (
-// MessageID that points to the earliest message avaialable in a topic
+// MessageID that points to the earliest message available in a topic
// TODO: EarliestMessage MessageID = earliestMessageID()
// MessageID that points to the latest message