You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/23 20:04:46 UTC

[GitHub] [pulsar-client-go] zzzming commented on a diff in pull request #717: [WIP] [Issue 456] feat: support chunked msg

zzzming commented on code in PR #717:
URL: https://github.com/apache/pulsar-client-go/pull/717#discussion_r905393855


##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) {
+	chunkSize := int(p._getConn().GetMaxMessageSize())
+	totalChunks := (len(payload)+1)/chunkSize + 1
+	uuid := uuidGen.New().String()
+
+	for chunkId := 0; chunkId < chunkSize; chunkId++ {
+		left := chunkId * chunkSize
+		right := left + chunkSize
+		if right > len(payload)-1 {
+			right = len(payload) - 1
+		}
+		// [left, right)
+		p.internalSendSingleChunk(request, payload[left:right], uuid, totalChunks, len(payload), chunkId)
+	}
+}
+
+func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, payload []byte,
+	uuid string, totalChunks int, totalSize int, chunkId int) {
+
+	msg := request.msg
+	mm := &pb.MessageMetadata{}
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+		mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+	}
+
+	if msg.EventTime.UnixNano() != 0 {
+		mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
+	}
+
+	if msg.Key != "" {
+		mm.PartitionKey = proto.String(msg.Key)
+	}
+
+	if len(msg.OrderingKey) != 0 {
+		mm.OrderingKey = []byte(msg.OrderingKey)
+	}
+
+	if msg.Properties != nil {
+		mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+	}
+
+	if msg.SequenceID != nil {
+		sequenceID := uint64(*msg.SequenceID)
+		mm.SequenceId = proto.Uint64(sequenceID)
+	}
+
+	// Fields required for chunked data
+	mm.Uuid = proto.String(uuid)
+	mm.NumChunksFromMsg = proto.Int(totalChunks)
+	mm.TotalChunkMsgSize = proto.Int(totalSize)
+	mm.ChunkId = proto.Int(chunkId)
+
+	// Directly construct a buffer and put it to the pending queue
+	newBuffer := p.GetBuffer()
+	internal.ConstructBufferFromMessage(newBuffer, mm, payload)
+
+	callbacks := make([]interface{}, 1)
+	callbacks[0] = request.callback
+
+	p.pendingQueue.Put(&pendingItem{
+		sentAt:       time.Now(),
+		batchData:    newBuffer,
+		sequenceID:   uint64(*msg.SequenceID),
+		sendRequests: callbacks,
+	})
+	p._getConn().WriteData(newBuffer)

Review Comment:
   When a batching is enabled, how does this flush work with batching? 
   One scenario is there is message already in a batch yet to be flushed upon batch requirements are fulfilled. There is a large message added requires chunking. Is this logic going to flush the chunk message ahead of previously batched message? 
   So do you need to flush the batch first before call individual chunking flushing? Probably add a logic like this before flush a chunk?
   ```
   batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
   if batchData != nil {
   p.pendingQueue.Put(&pendingItem{
   		sentAt:       time.Now(),
   		batchData:    batchData,
   		sequenceID:   sequenceID,
   		sendRequests: callbacks,
   	})
   	p._getConn().WriteData(batchData)}
   ```



##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) {
+	chunkSize := int(p._getConn().GetMaxMessageSize())
+	totalChunks := (len(payload)+1)/chunkSize + 1

Review Comment:
   The last `+1` may not be accurate. Java implementation has a more precise way to calculate the number of chunks 
   ```
    int totalChunks = canAddToBatch(msg) ? 1
                   : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
                           + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
   ```



##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 }
 
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) {
+	chunkSize := int(p._getConn().GetMaxMessageSize())
+	totalChunks := (len(payload)+1)/chunkSize + 1
+	uuid := uuidGen.New().String()
+
+	for chunkId := 0; chunkId < chunkSize; chunkId++ {
+		left := chunkId * chunkSize
+		right := left + chunkSize
+		if right > len(payload)-1 {
+			right = len(payload) - 1
+		}
+		// [left, right)

Review Comment:
   Can we make this as a separate function to return a slice of [ {trunk 0 left,right}, {trunk 1 left,right} ... ]. So that we can write a unit test to verify any number is missing from the splitting. WDYT?



##########
pulsar/internal/batch_builder.go:
##########
@@ -207,6 +215,57 @@ func (bc *batchContainer) Add(
 	return true
 }
 
+func (bc *batchContainer) AddMessageMetaData(

Review Comment:
   I could not find any code calls this function. Who's supposed to call this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org