You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/02 08:11:42 UTC

[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961383344


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {

Review Comment:
   Could we add some unit tests to test this method?



##########
pulsar/producer_partition.go:
##########
@@ -463,12 +465,12 @@ func (p *partitionProducer) Name() string {
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {
-	p.log.Debug("Received send request: ", *request)
+	p.log.Debug("Received send request: ", *request.msg)

Review Comment:
   Why changed here?



##########
pulsar/producer_partition.go:
##########
@@ -843,33 +1018,24 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
 		return
 	}
 
+	// bc only works when DisableBlockIfQueueFull is false
+	bc := make(chan struct{})
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
 		callback:         callback,
 		flushImmediately: flushImmediately,
 		publishTime:      time.Now(),
+		blockCh:          bc,
 	}
 	p.options.Interceptors.BeforeSend(p, msg)
 
-	if p.options.DisableBlockIfQueueFull {

Review Comment:
   If we remove these codes. It seems will be blocked at `p.eventsChan <- sr` if it reaches `maxPendingMessages` even if the DisableBlockIfQueueFull is true. Because the eventsChan is initialized with `make(chan interface{}, maxPendingMessages)`.



##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is not closed.

Review Comment:
   ```suggestion
   	// Chunking can not be enabled when batching is enabled.
   ```



##########
pulsar/producer_partition.go:
##########
@@ -246,42 +256,14 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 
-	var encryptor internalcrypto.Encryptor
+	//var encryptor internalcrypto.Encryptor

Review Comment:
   Please remove this comment.



##########
pulsar/producer_partition.go:
##########
@@ -717,6 +886,7 @@ func (p *partitionProducer) failTimeoutMessages() {
 			for _, i := range pi.sendRequests {
 				sr := i.(*sendRequest)
 				if sr.msg != nil {
+					// todo: it's not correct. the size should be schemaed uncompressed payload size

Review Comment:
   Let's create an issue to track it and remove this comment.



##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {
+	mm = &pb.MessageMetadata{
+		ProducerName:     &p.producerName,
+		PublishTime:      proto.Uint64(internal.TimestampMillis(time.Now())),
+		ReplicateTo:      msg.ReplicationClusters,
+		UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+	}
+
+	if msg.Key != "" {
+		mm.PartitionKey = proto.String(msg.Key)
+	}
+
+	if msg.Properties != nil {
+		mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+	}
+
 	deliverAt := msg.DeliverAt
 	if msg.DeliverAfter.Nanoseconds() > 0 {
 		deliverAt = time.Now().Add(msg.DeliverAfter)
 	}
+	if deliverAt.UnixNano() > 0 {
+		mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	return
+}
 
-	smm := &pb.SingleMessageMetadata{
-		PayloadSize: proto.Int(len(payload)),
+func (p *partitionProducer) genSingleMetaMessage(msg *ProducerMessage,

Review Comment:
   Is this method only used for batch sending messages? Not the normal message(Neither chunking or batching)?



##########
pulsar/producer_partition.go:
##########
@@ -827,6 +998,10 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
 
 	// wait for send request to finish
 	<-doneCh
+
+	// handle internal error
+	p.internalErrHandle(err)

Review Comment:
   Why do we need to introduce the internal error handler here? Can we handle this error inside the sending operation?



##########
pulsar/producer_partition.go:
##########
@@ -911,6 +1077,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 				p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
 				p.metrics.MessagesPublished.Inc()
 				p.metrics.MessagesPending.Dec()
+				// todo: it's not correct. the size should be schemaed uncompressed payload size

Review Comment:
   Let's create an issue to track it and remove this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org