You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/12/14 00:03:00 UTC

[GitHub] [pulsar-client-go] tuteng commented on a change in pull request #124: [Issue #123] Ensure message is sent if no room in current batch.

tuteng commented on a change in pull request #124: [Issue #123] Ensure message is sent if no room in current batch.
URL: https://github.com/apache/pulsar-client-go/pull/124#discussion_r357876188
 
 

 ##########
 File path: pulsar/producer_partition.go
 ##########
 @@ -256,10 +256,17 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	}
 
 	if sendAsBatch {
-		ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
-		if ok == false {
+		added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+		if !added {
 			// The current batch is full.. flush it and retry
 			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !ok {
+				p.log.WithField("size", len(msg.Payload)).
+					WithField("sequenceID", sequenceID).
+					Error("unable to send message")
+			}
 		}
 	} else {
 		// Send individually
 
 Review comment:
   I think there may be the same problem here. If false is returned after the https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/batch_builder.go#L107 function is executed.
   
   ```
   // Add will add single message to batch.
   func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte,
   	callback interface{}, replicateTo []string) bool {
   	if replicateTo != nil && bb.numMessages != 0 {
   		// If the current batch is not empty and we're trying to set the replication clusters,
   		// then we need to force the current batch to flush and send the message individually
   		return false
   	} else if bb.msgMetadata.ReplicateTo != nil {
   		// There's already a message with cluster replication list. need to flush before next
   		// message can be sent
   		return false
   	} else if bb.hasSpace(payload) {
   		// The current batch is full. Producer has to call Flush() to
   		return false
   	}
   
   	if bb.numMessages == 0 {
   		bb.msgMetadata.SequenceId = proto.Uint64(sequenceID)
   		bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
   		bb.msgMetadata.SequenceId = proto.Uint64(sequenceID)
   		bb.msgMetadata.ProducerName = &bb.producerName
   		bb.msgMetadata.ReplicateTo = replicateTo
   
   		bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
   	}
   	addSingleMessageToBatch(bb.buffer, metadata, payload)
   
   	bb.numMessages++
   	bb.callbacks = append(bb.callbacks, callback)
   	return true
   }
   ```
   
    This data will not be retried, resulting in incorrect processing. Therefore, I think retry should be added after internalFlushCurrentBatch under this condition.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services