You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:52 UTC
[pulsar-client-go] 15/38: Handle cases with no-batching
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 9522dd53c29f25b2f5b86c4ec3fecc8c4955133e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 16:32:23 2019 -0700
Handle cases with no-batching
---
pulsar/impl/batch_builder.go | 16 ++++++++--
pulsar/impl/commands.go | 2 +-
pulsar/impl_partition_producer.go | 66 +++++++++++++++++++++++++--------------
pulsar/producer.go | 6 ++--
4 files changed, 61 insertions(+), 29 deletions(-)
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 7a16155..e8c0af8 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -61,8 +61,16 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
}
func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte,
- callback interface{}) bool {
- if bb.hasSpace(payload) {
+ callback interface{}, replicateTo []string, ) bool {
+ if replicateTo != nil && bb.numMessages != 0 {
+ // If the current batch is not empty and we're trying to set the replication clusters,
+ // then we need to force the current batch to flush and send the message individually
+ return false
+ } else if bb.msgMetadata.ReplicateTo != nil {
+ // There's already a message with cluster replication list. need to flush before next
+ // message can be sent
+ return false
+ } else if bb.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}
@@ -72,10 +80,11 @@ func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint6
bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
bb.msgMetadata.SequenceId = proto.Uint64(sequenceId)
bb.msgMetadata.ProducerName = &bb.producerName
+ bb.msgMetadata.ReplicateTo = replicateTo
bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceId)
}
- serializeSingleMessage(bb.buffer, metadata, payload)
+ addSingleMessageToBatch(bb.buffer, metadata, payload)
bb.numMessages += 1
bb.callbacks = append(bb.callbacks, callback)
@@ -86,6 +95,7 @@ func (bb *BatchBuilder) reset() {
bb.numMessages = 0
bb.buffer.Clear()
bb.callbacks = []interface{}{}
+ bb.msgMetadata.ReplicateTo = nil
}
func (bb *BatchBuilder) Flush() (batchData []byte, sequenceId uint64, callbacks []interface{}) {
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index 6b1c840..08f8124 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -36,7 +36,7 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
return cmd
}
-func serializeSingleMessage(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
+func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
serialized, err := proto.Marshal(smm)
if err != nil {
log.WithError(err).Fatal("Protobuf serialization error")
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 5f50296..d0cd8cf 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -158,6 +158,8 @@ func (p *partitionProducer) runEventsLoop() {
p.internalSend(v)
case *connectionClosed:
p.reconnectToBroker()
+ case *closeProducer:
+ p.internalClose(v)
}
case _ = <-p.batchFlushTicker.C:
@@ -179,30 +181,38 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg := request.msg
- if msg.ReplicationClusters == nil {
- smm := &pb.SingleMessageMetadata{
- PayloadSize: proto.Int(len(msg.Payload)),
- }
+ sendAsBatch := !p.options.DisableBatching && request.msg.ReplicationClusters == nil
+ smm := &pb.SingleMessageMetadata{
+ PayloadSize: proto.Int(len(msg.Payload)),
+ }
- if msg.EventTime != nil {
- smm.EventTime = proto.Uint64(impl.TimestampMillis(*msg.EventTime))
- }
+ if msg.EventTime != nil {
+ smm.EventTime = proto.Uint64(impl.TimestampMillis(*msg.EventTime))
+ }
- if msg.Key != "" {
- smm.PartitionKey = &msg.Key
- }
+ if msg.Key != "" {
+ smm.PartitionKey = &msg.Key
+ }
- if msg.Properties != nil {
- smm.Properties = impl.ConvertFromStringMap(msg.Properties)
- }
+ if msg.Properties != nil {
+ smm.Properties = impl.ConvertFromStringMap(msg.Properties)
+ }
- sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
- for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload, request) == false; {
+ sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
+
+ if sendAsBatch {
+ for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload, request, msg.ReplicationClusters) == false; {
// The current batch is full.. flush it and retry
p.internalFlush()
}
} else {
- p.log.Panic("TODO: serialize into single message")
+ // Send individually
+ p.batchBuilder.Add(smm, sequenceId, msg.Payload, request, msg.ReplicationClusters)
+ p.internalFlush()
+ }
+
+ if request.flushImmediately {
+ p.internalFlush()
}
}
@@ -228,10 +238,10 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
var err error
- p.SendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
+ p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
err = e
wg.Done()
- })
+ }, true)
// When sending synchronously we flush immediately to avoid
// the increased latency and reduced throughput of batching
@@ -246,7 +256,13 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.publishSemaphore.Acquire()
- p.eventsChan <- &sendRequest{ctx, msg, callback}
+ p.eventsChan <- &sendRequest{ctx, msg, callback, false}
+}
+
+func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
+ p.publishSemaphore.Acquire()
+ p.eventsChan <- &sendRequest{ctx, msg, callback, flushImmediately}
}
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -264,7 +280,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
p.publishSemaphore.Release()
- for _ ,i := range pi.sendRequest {
+ for _, i := range pi.sendRequest {
sr := i.(*sendRequest)
sr.callback(nil, sr.msg, nil)
}
@@ -285,7 +301,11 @@ func (p *partitionProducer) Close() error {
}
type sendRequest struct {
- ctx context.Context
- msg *ProducerMessage
- callback func(MessageID, *ProducerMessage, error)
+ ctx context.Context
+ msg *ProducerMessage
+ callback func(MessageID, *ProducerMessage, error)
+ flushImmediately bool
}
+
+type closeProducer struct {
+}
\ No newline at end of file
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 0e59e66..8eaf340 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -130,7 +130,8 @@ type ProducerOptions struct {
// partition index where the message should be routed to
MessageRouter func(Message, TopicMetadata) int
- // Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
+ // Control whether automatic batching of messages is enabled for the producer. By default batching
+ // is enabled.
//
// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
@@ -138,7 +139,8 @@ type ProducerOptions struct {
// contents.
//
// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
- Batching bool
+ // Setting `DisableBatching: true` will make the producer to send messages individually
+ DisableBatching bool
// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
// enabled. If set to a non zero value, messages will be queued until this time interval or until