You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/24 05:31:18 UTC

[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002836092


##########
pulsar/consumer_impl.go:
##########
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return
-	}
-
-	if mid.consumer != nil {
-		mid.Nack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_partition.go:
##########
@@ -698,8 +746,23 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		}
 	}
 
+	isChunkedMsg := false
+	if msgMeta.GetNumChunksFromMsg() > 1 {
+		isChunkedMsg = true
+	}
+
+	processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)

Review Comment:
   ```suggestion
   	var processedPayloadBuffer internal.Buffer
   ```



##########
pulsar/consumer_impl.go:
##########
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
 		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
 	}
 
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return nil
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_impl.go:
##########
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return errors.New("failed to convert trackingMessageID")
-	}
-
-	if mid.consumer != nil {
-		return mid.Ack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Missed covert the `msgID` from the `MessageID` to the `trackingMessageID` type, I'm not sure if we need this.
   
   Why not use `messageID()`, what did I miss?
   
   



##########
pulsar/producer_partition.go:
##########
@@ -299,6 +278,26 @@ func (p *partitionProducer) grabCnx() error {
 	if err != nil {
 		return err
 	}
+
+	if p.options.DisableBatching {

Review Comment:
   The following code looks like more clear:
   ```
   	if !p.options.DisableBatching {
   		batcherBuilderType := p.options.BatcherBuilderType
   		provider, err := GetBatcherBuilderProvider(batcherBuilderType)
   		if err != nil {
   			return err
   		}
   		maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
   		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
   			maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
   			compression.Level(p.options.CompressionLevel),
   			p,
   			p.log,
   			p.encryptor)
   		if err != nil {
   			return err
   		}
   	}
   ```



##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   Please revert this code(170-174).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org