You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:52 UTC

[pulsar-client-go] 15/38: Handle cases with no-batching

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 9522dd53c29f25b2f5b86c4ec3fecc8c4955133e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 16:32:23 2019 -0700

    Handle cases with no-batching
---
 pulsar/impl/batch_builder.go      | 16 ++++++++--
 pulsar/impl/commands.go           |  2 +-
 pulsar/impl_partition_producer.go | 66 +++++++++++++++++++++++++--------------
 pulsar/producer.go                |  6 ++--
 4 files changed, 61 insertions(+), 29 deletions(-)

diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 7a16155..e8c0af8 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -61,8 +61,16 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 }
 
 func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint64, payload []byte,
-	callback interface{}) bool {
-	if bb.hasSpace(payload) {
+	callback interface{}, replicateTo []string, ) bool {
+	if replicateTo != nil && bb.numMessages != 0 {
+		// If the current batch is not empty and we're trying to set the replication clusters,
+		// then we need to force the current batch to flush and send the message individually
+		return false
+	} else if bb.msgMetadata.ReplicateTo != nil {
+		// There's already a message with cluster replication list. need to flush before next
+		// message can be sent
+		return false
+	} else if bb.hasSpace(payload) {
 		// The current batch is full. Producer has to call Flush() to
 		return false
 	}
@@ -72,10 +80,11 @@ func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceId uint6
 		bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
 		bb.msgMetadata.SequenceId = proto.Uint64(sequenceId)
 		bb.msgMetadata.ProducerName = &bb.producerName
+		bb.msgMetadata.ReplicateTo = replicateTo
 
 		bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceId)
 	}
-	serializeSingleMessage(bb.buffer, metadata, payload)
+	addSingleMessageToBatch(bb.buffer, metadata, payload)
 
 	bb.numMessages += 1
 	bb.callbacks = append(bb.callbacks, callback)
@@ -86,6 +95,7 @@ func (bb *BatchBuilder) reset() {
 	bb.numMessages = 0
 	bb.buffer.Clear()
 	bb.callbacks = []interface{}{}
+	bb.msgMetadata.ReplicateTo = nil
 }
 
 func (bb *BatchBuilder) Flush() (batchData []byte, sequenceId uint64, callbacks []interface{}) {
diff --git a/pulsar/impl/commands.go b/pulsar/impl/commands.go
index 6b1c840..08f8124 100644
--- a/pulsar/impl/commands.go
+++ b/pulsar/impl/commands.go
@@ -36,7 +36,7 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
 	return cmd
 }
 
-func serializeSingleMessage(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
+func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
 	serialized, err := proto.Marshal(smm)
 	if err != nil {
 		log.WithError(err).Fatal("Protobuf serialization error")
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 5f50296..d0cd8cf 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -158,6 +158,8 @@ func (p *partitionProducer) runEventsLoop() {
 				p.internalSend(v)
 			case *connectionClosed:
 				p.reconnectToBroker()
+			case *closeProducer:
+				p.internalClose(v)
 			}
 
 		case _ = <-p.batchFlushTicker.C:
@@ -179,30 +181,38 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 
 	msg := request.msg
 
-	if msg.ReplicationClusters == nil {
-		smm := &pb.SingleMessageMetadata{
-			PayloadSize: proto.Int(len(msg.Payload)),
-		}
+	sendAsBatch := !p.options.DisableBatching && request.msg.ReplicationClusters == nil
+	smm := &pb.SingleMessageMetadata{
+		PayloadSize: proto.Int(len(msg.Payload)),
+	}
 
-		if msg.EventTime != nil {
-			smm.EventTime = proto.Uint64(impl.TimestampMillis(*msg.EventTime))
-		}
+	if msg.EventTime != nil {
+		smm.EventTime = proto.Uint64(impl.TimestampMillis(*msg.EventTime))
+	}
 
-		if msg.Key != "" {
-			smm.PartitionKey = &msg.Key
-		}
+	if msg.Key != "" {
+		smm.PartitionKey = &msg.Key
+	}
 
-		if msg.Properties != nil {
-			smm.Properties = impl.ConvertFromStringMap(msg.Properties)
-		}
+	if msg.Properties != nil {
+		smm.Properties = impl.ConvertFromStringMap(msg.Properties)
+	}
 
-		sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
-		for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload, request) == false; {
+	sequenceId := impl.GetAndAdd(p.sequenceIdGenerator, 1)
+
+	if sendAsBatch {
+		for ; p.batchBuilder.Add(smm, sequenceId, msg.Payload, request, msg.ReplicationClusters) == false; {
 			// The current batch is full.. flush it and retry
 			p.internalFlush()
 		}
 	} else {
-		p.log.Panic("TODO: serialize into single message")
+		// Send individually
+		p.batchBuilder.Add(smm, sequenceId, msg.Payload, request, msg.ReplicationClusters)
+		p.internalFlush()
+	}
+
+	if request.flushImmediately {
+		p.internalFlush()
 	}
 }
 
@@ -228,10 +238,10 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
 
 	var err error
 
-	p.SendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
+	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
 		err = e
 		wg.Done()
-	})
+	}, true)
 
 	// When sending synchronously we flush immediately to avoid
 	// the increased latency and reduced throughput of batching
@@ -246,7 +256,13 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro
 func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
 	p.publishSemaphore.Acquire()
-	p.eventsChan <- &sendRequest{ctx, msg, callback}
+	p.eventsChan <- &sendRequest{ctx, msg, callback, false}
+}
+
+func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
+	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
+	p.publishSemaphore.Acquire()
+	p.eventsChan <- &sendRequest{ctx, msg, callback, flushImmediately}
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -264,7 +280,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
 	p.pendingQueue.Poll()
 	p.publishSemaphore.Release()
-	for _ ,i := range pi.sendRequest {
+	for _, i := range pi.sendRequest {
 		sr := i.(*sendRequest)
 		sr.callback(nil, sr.msg, nil)
 	}
@@ -285,7 +301,11 @@ func (p *partitionProducer) Close() error {
 }
 
 type sendRequest struct {
-	ctx      context.Context
-	msg      *ProducerMessage
-	callback func(MessageID, *ProducerMessage, error)
+	ctx              context.Context
+	msg              *ProducerMessage
+	callback         func(MessageID, *ProducerMessage, error)
+	flushImmediately bool
 }
+
+type closeProducer struct {
+}
\ No newline at end of file
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 0e59e66..8eaf340 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -130,7 +130,8 @@ type ProducerOptions struct {
 	// partition index where the message should be routed to
 	MessageRouter func(Message, TopicMetadata) int
 
-	// Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
+	// Control whether automatic batching of messages is enabled for the producer. By default batching
+	// is enabled.
 	//
 	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
 	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
@@ -138,7 +139,8 @@ type ProducerOptions struct {
 	// contents.
 	//
 	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
-	Batching bool
+	// Setting `DisableBatching: true` will make the producer to send messages individually
+	DisableBatching bool
 
 	// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
 	// enabled. If set to a non zero value, messages will be queued until this time interval or until