You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/10 08:02:15 UTC

[GitHub] [pulsar-client-go] Gleiphir2769 opened a new pull request, #805: [Issue 456] Support chunking to produce big messages.

Gleiphir2769 opened a new pull request, #805:
URL: https://github.com/apache/pulsar-client-go/pull/805

   <--
   ### Contribution Checklist
   
   Master Issue: [#456](https://github.com/apache/pulsar-client-go/issues/456)
   
   ### Motivation
   
   Make pulsar go client support chunking to produce big messages. The earlier implementation ([#717](https://github.com/apache/pulsar-client-go/pull/717)) didn't take into account many details, so I decided to reimplement it.
   
   ### Modifications
   
   - Add `internalSingleSend` to send message without batch because batch message will not be received by chunk.
   - Moved `BlockIfQueueFull` check from `internalSendAsync` to `internalSend` (`canAddQueue`) to ensure the normal block in chunking.
   - Make partitionProducer send big messages by chunking.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   - Add to `TestProducerChunking` verify send big message by chunking.
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency):  no
     - The public API: no
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? not yet
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r969321432


##########
pulsar/consumer_impl.go:
##########
@@ -579,12 +589,13 @@ func (c *consumer) Seek(msgID MessageID) error {
 		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
 	}
 
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return nil
+	if msgID.PartitionIdx() < 0 || int(msgID.PartitionIdx()) >= len(c.consumers) {

Review Comment:
   Same comment as above.



##########
pulsar/consumer_impl.go:
##########
@@ -533,17 +545,15 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return
-	}
-
-	if mid.consumer != nil {
-		mid.Nack()
+	partition := int(msgID.PartitionIdx())

Review Comment:
   Same comment as above.



##########
pulsar/consumer_impl.go:
##########
@@ -453,16 +465,16 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return errors.New("failed to convert trackingMessageID")
-	}
-
-	if mid.consumer != nil {
-		return mid.Ack()
+	partition := int(msgID.PartitionIdx())

Review Comment:
   Why not make a method to check the `partition` or the `msgID`?
   
   You can improve `messageID()` method.



##########
pulsar/consumer_partition.go:
##########
@@ -1444,3 +1564,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.chunkedMsgIDs[0]

Review Comment:
   ```suggestion
   	if len(c.chunkedMsgIDs) == 0 { return messageID{} } else { return c.chunkedMsgIDs[0] }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r985364970


##########
pulsar/impl_message.go:
##########
@@ -372,3 +386,28 @@ func (t *ackTracker) completed() bool {
 	defer t.Unlock()
 	return len(t.batchIDs.Bits()) == 0
 }
+
+type chunkMessageID struct {
+	messageID
+
+	firstChunkID messageID
+	receivedTime time.Time
+
+	consumer acker
+}
+
+func newChunkMessageID(firstChunkID messageID, lastChunkID messageID) chunkMessageID {
+	return chunkMessageID{
+		messageID:    lastChunkID,
+		firstChunkID: firstChunkID,
+		receivedTime: time.Now(),
+	}
+}
+
+func (id chunkMessageID) String() string {
+	return fmt.Sprintf("%s;%s", id.firstChunkID.String(), id.messageID.String())
+}
+
+func (id chunkMessageID) Serialize() []byte {
+	return id.firstChunkID.Serialize()

Review Comment:
   Done, Thx.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(_brokerMaxMessageSize + 1),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	totalProducers := 5
+	producers := make([]Producer, 0, 20)
+	defer func() {
+		for _, p := range producers {
+			p.Close()
+		}
+	}()
+
+	clients := make([]Client, 0, 20)
+	defer func() {
+		for _, c := range clients {
+			c.Close()
+		}
+	}()
+
+	for j := 0; j < totalProducers; j++ {
+		pc, err := NewClient(ClientOptions{
+			URL: lookupURL,
+		})
+		assert.Nil(t, err)
+		clients = append(clients, pc)
+		producer, err := pc.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: 10,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, producer)
+		producers = append(producers, producer)
+	}
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                    topic,
+		Type:                     Exclusive,
+		SubscriptionName:         "chunk-subscriber",
+		MaxPendingChunkedMessage: 1,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	totalMsgs := 40
+	wg := sync.WaitGroup{}
+	wg.Add(totalMsgs * totalProducers)
+	for i := 0; i < totalMsgs; i++ {
+		for j := 0; j < totalProducers; j++ {
+			p := producers[j]
+			go func() {
+				ID, err := p.Send(context.Background(), &ProducerMessage{
+					Payload: createTestMessagePayload(50),
+				})
+				assert.NoError(t, err)
+				assert.NotNil(t, ID)
+				wg.Done()
+			}()
+		}
+	}
+	wg.Wait()
+
+	received := 0
+	for i := 0; i < totalMsgs*totalProducers; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		if msg == nil || (err != nil && errors.Is(err, context.DeadlineExceeded)) {
+			break
+		}
+
+		received++
+
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	c, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		Type:                        Exclusive,
+		SubscriptionName:            "chunk-subscriber",
+		ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+	})
+	assert.NoError(t, err)
+	defer c.Close()
+
+	uuid := "test-uuid"
+	chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+	chunkCtxMap.addIfAbsent(uuid, 2, 100)
+	ctx := chunkCtxMap.get(uuid)
+	assert.NotNil(t, ctx)
+
+	time.Sleep(400 * time.Millisecond)
+
+	ctx = chunkCtxMap.get(uuid)
+	assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		EnableChunking:          true,
+		DisableBatching:         true,
+		MaxPendingMessages:      10,
+		ChunkMaxMessageSize:     50,
+		DisableBlockIfQueueFull: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(1000),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	totalMessages := 5
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "default-seek",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	msgIDs := make([]MessageID, 0)
+	for i := 0; i < totalMessages; i++ {
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(100),
+		})
+		assert.NoError(t, err)
+		msgIDs = append(msgIDs, ID)
+	}
+
+	for i := 0; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	err = consumer.Seek(msgIDs[1])
+	assert.NoError(t, err)
+
+	for i := 1; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}

Review Comment:
   I think startMessageIdInclusive should add to another PR. Because it doesn't matter of chunking implement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961499351


##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is not closed.

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961441881


##########
pulsar/producer_partition.go:
##########
@@ -843,33 +1018,24 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
 		return
 	}
 
+	// bc only works when DisableBlockIfQueueFull is false
+	bc := make(chan struct{})
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
 		callback:         callback,
 		flushImmediately: flushImmediately,
 		publishTime:      time.Now(),
+		blockCh:          bc,
 	}
 	p.options.Interceptors.BeforeSend(p, msg)
 
-	if p.options.DisableBlockIfQueueFull {

Review Comment:
   Yes, you're right. I will change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by "Gleiphir2769 (via GitHub)" <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1538335263

   > Does the chunking work on shared subscription introduced in Pulsar 2.11
   
   Hi @maraiskruger1980, This PR is finished when pulsar 2.11 has not been released. So it doesn't support shared subscription chunking.
   
   I think I can take some time on it. Welcome to follow the progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] maraiskruger1980 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by "maraiskruger1980 (via GitHub)" <gi...@apache.org>.
maraiskruger1980 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1538408207

   That will be great if it can support shared subscription


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by "Gleiphir2769 (via GitHub)" <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1626893190

   > That will be great if it can support shared subscription
   
   Hi @maraiskruger1980. After I checked, it does no limit in consumer when subscription is `shared`. Which means you can safely consume chunking messages in `shared` subscribtion if your pulsar version >= 2.11.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r979344780


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set MaxChunkSize
+		if p.options.MaxChunkSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
 	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {

Review Comment:
   Enable `DisableBlockIfQueueFull` may not a good idea. I fix this problem by moving the `canAddToQueue` before the real send. This solution can ensure that the semaphore acquired for by chunking can be released in time. But it will also cause the sending of a single message (no batch) to waste more CPU resources (e.g. the single message must wait for permits or be discarded after compression).
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983291109


##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)
+	}
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return nil
+	}
+	return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	ctx, ok := c.chunkedMsgCtxs[uuid]
+	if !ok {
+		return
+	}
+	if autoAck {
+		ctx.discard(c.pc)
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", uuid)
+}
+
+func (c *chunkedMsgCtxMap) removeChunkIfExpire(uuid string, autoAck bool, expire time.Duration) {
+	timer := time.NewTimer(expire)
+	<-timer.C
+	c.removeChunkMessage(uuid, autoAck)
+}
+
+func (c *chunkedMsgCtxMap) Close() {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.closed = true
+}
+
+type unAckChunksTracker struct {
+	chunkIDs map[chunkMessageID][]messageID
+	pc       *partitionConsumer
+	mu       sync.Mutex
+}
+
+func newUnAckChunksTracker(pc *partitionConsumer) *unAckChunksTracker {
+	return &unAckChunksTracker{
+		chunkIDs: make(map[chunkMessageID][]messageID),
+		pc:       pc,
+	}
+}
+
+func (u *unAckChunksTracker) add(cmid chunkMessageID, ids []messageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	u.chunkIDs[cmid] = ids
+}
+
+func (u *unAckChunksTracker) get(cmid chunkMessageID) []messageID {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	return u.chunkIDs[cmid]
+}
+
+func (u *unAckChunksTracker) remove(cmid chunkMessageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	delete(u.chunkIDs, cmid)
+}
+
+func (u *unAckChunksTracker) ack(cmid chunkMessageID) error {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		if err := u.pc.AckID(id); err != nil {
+			return err
+		}
+	}
+	u.remove(cmid)
+	return nil
+}
+
+func (u *unAckChunksTracker) nack(cmid chunkMessageID) {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		u.pc.nackTracker.Add(id)
+		u.pc.metrics.NacksCounter.Inc()

Review Comment:
   > I think it's better to call u.pc.NackID here. 
   
   My thought at the time was to avoid the two unnecessary type judgments in `u.pc.NackID`.
   
   > we need to maintain in two places.
   
   But I agree with you that saving two type judgments is less beneficial than keeping the code concise.
   
   I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1279882470

   This PR will fix this issue #447 too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1003030610


##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   > Good catch, but using inline looks like more clear, so like:
   > 
   > ```
   > return bc.numMessages+1 <= bc.maxMessages && expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
   > ```
   
   Done, thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r966726634


##########
pulsar/consumer.go:
##########
@@ -189,6 +189,17 @@ type ConsumerOptions struct {
 	// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
 	// Default: false
 	AckWithResponse bool
+
+	// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
+	MaxPendingChunkedMessage int
+
+	// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message.
+	// The timing accuracy is 100ms level and the default value is 60 * 1000 millis

Review Comment:
   OK, I will add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973556288


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)

Review Comment:
   You're right. And it seems that there are some original code that did not release publishSemaphore too.
   https://github.com/apache/pulsar-client-go/blob/edd5c71651b79bd35358a51ae3925905ed9f17e1/pulsar/producer_partition.go#L484-L490
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973556288


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)

Review Comment:
   You're right. And it seems that there are some original code that did not release publishSemaphore.
   https://github.com/apache/pulsar-client-go/blob/edd5c71651b79bd35358a51ae3925905ed9f17e1/pulsar/producer_partition.go#L484-L490
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] RobertIndie merged pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
RobertIndie merged PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1003007100


##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   @nodece This PR introduces the `maxMessageSize`. I think these changes should be related to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973601872


##########
pulsar/producer_partition.go:
##########
@@ -745,28 +933,34 @@ func (p *partitionProducer) failTimeoutMessages() {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatches() {
-	batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches()
+	batchesData, sequenceIDs, callbacks, errs := p.batchBuilder.FlushBatches()

Review Comment:
   `Errors.Is()` is used to handle internal error (L950). The original parameter name `errors` is conflict with package `errors`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r970266596


##########
pulsar/consumer_partition.go:
##########
@@ -1444,3 +1564,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.chunkedMsgIDs[0]

Review Comment:
   Good suggestion. Original code may cause index out of range error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961495238


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {
+	mm = &pb.MessageMetadata{
+		ProducerName:     &p.producerName,
+		PublishTime:      proto.Uint64(internal.TimestampMillis(time.Now())),
+		ReplicateTo:      msg.ReplicationClusters,
+		UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+	}
+
+	if msg.Key != "" {
+		mm.PartitionKey = proto.String(msg.Key)
+	}
+
+	if msg.Properties != nil {
+		mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+	}
+
 	deliverAt := msg.DeliverAt
 	if msg.DeliverAfter.Nanoseconds() > 0 {
 		deliverAt = time.Now().Add(msg.DeliverAfter)
 	}
+	if deliverAt.UnixNano() > 0 {
+		mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	return
+}
 
-	smm := &pb.SingleMessageMetadata{
-		PayloadSize: proto.Int(len(payload)),
+func (p *partitionProducer) genSingleMetaMessage(msg *ProducerMessage,

Review Comment:
   Yes, the chunking or single message do not need smm (SingleMessageMetadata), it only used for batching messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983291109


##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)
+	}
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return nil
+	}
+	return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	ctx, ok := c.chunkedMsgCtxs[uuid]
+	if !ok {
+		return
+	}
+	if autoAck {
+		ctx.discard(c.pc)
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", uuid)
+}
+
+func (c *chunkedMsgCtxMap) removeChunkIfExpire(uuid string, autoAck bool, expire time.Duration) {
+	timer := time.NewTimer(expire)
+	<-timer.C
+	c.removeChunkMessage(uuid, autoAck)
+}
+
+func (c *chunkedMsgCtxMap) Close() {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.closed = true
+}
+
+type unAckChunksTracker struct {
+	chunkIDs map[chunkMessageID][]messageID
+	pc       *partitionConsumer
+	mu       sync.Mutex
+}
+
+func newUnAckChunksTracker(pc *partitionConsumer) *unAckChunksTracker {
+	return &unAckChunksTracker{
+		chunkIDs: make(map[chunkMessageID][]messageID),
+		pc:       pc,
+	}
+}
+
+func (u *unAckChunksTracker) add(cmid chunkMessageID, ids []messageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	u.chunkIDs[cmid] = ids
+}
+
+func (u *unAckChunksTracker) get(cmid chunkMessageID) []messageID {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	return u.chunkIDs[cmid]
+}
+
+func (u *unAckChunksTracker) remove(cmid chunkMessageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	delete(u.chunkIDs, cmid)
+}
+
+func (u *unAckChunksTracker) ack(cmid chunkMessageID) error {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		if err := u.pc.AckID(id); err != nil {
+			return err
+		}
+	}
+	u.remove(cmid)
+	return nil
+}
+
+func (u *unAckChunksTracker) nack(cmid chunkMessageID) {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		u.pc.nackTracker.Add(id)
+		u.pc.metrics.NacksCounter.Inc()

Review Comment:
   > I think it's better to call u.pc.NackID here. 
   
   My thought at the time was to avoid unnecessary type judgments twice in `u.pc.NackID`.
   
   > we need to maintain in two places.
   
   But I agree with you that saving two type judgments is less beneficial than keeping the code concise.
   
   I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1288319834

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002998935


##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   Good catch, but using inline looks like more clear, so like:
   ```
   return bc.numMessages+1 <= bc.maxMessages && expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
   ```



##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   @Gleiphir2769 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983546515


##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {

Review Comment:
   Yes, the biggest challenge is that it is difficult to `stopBroker` here. So `TestExpireIncompleteChunksTestExpireIncompleteChunks` is not the "real" `expire`. 
   
   Do you think such a test has enough coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r970254250


##########
pulsar/consumer_impl.go:
##########
@@ -533,17 +545,15 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return
-	}
-
-	if mid.consumer != nil {
-		mid.Nack()
+	partition := int(msgID.PartitionIdx())

Review Comment:
   Same as above



##########
pulsar/consumer_impl.go:
##########
@@ -579,12 +589,13 @@ func (c *consumer) Seek(msgID MessageID) error {
 		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
 	}
 
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return nil
+	if msgID.PartitionIdx() < 0 || int(msgID.PartitionIdx()) >= len(c.consumers) {

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973556288


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)

Review Comment:
   You're right. And it seems that there are some places in the original code that did not release publishSemaphore.
   https://github.com/apache/pulsar-client-go/blob/edd5c71651b79bd35358a51ae3925905ed9f17e1/pulsar/producer_partition.go#L484-L490
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r974259601


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set MaxChunkSize
+		if p.options.MaxChunkSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
 	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {

Review Comment:
   I think BlockIfQueueFull shoud not be enabled when `chunking`. When `chunking` is enabled, the sending process will acquire more than one permits (the number equals to chunks). It may never be able to release the permits it has acquired. I create a pr here https://github.com/apache/pulsar/pull/17447.



##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set MaxChunkSize
+		if p.options.MaxChunkSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
 	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {

Review Comment:
   I think BlockIfQueueFull shoud not be enabled when `chunking`. When `chunking` is enabled, the sending process will acquire more than one permits (the number equals to chunks). It may never be able to release the permits it has acquired. I have created a pr here https://github.com/apache/pulsar/pull/17447.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002969571


##########
pulsar/producer_partition.go:
##########
@@ -299,6 +278,26 @@ func (p *partitionProducer) grabCnx() error {
 	if err != nil {
 		return err
 	}
+
+	if p.options.DisableBatching {

Review Comment:
   Good suggestion, thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r963695252


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {

Review Comment:
   Related issue: https://github.com/apache/pulsar/pull/16196/files#
   The payload chunk size are not calculated correctly by two reasons in the Java client. 
   1. The MessageMetadata will be updated after computing the payload chunk size, i.e. the actual metadata size would be greater.
   2. OpSendMsg#getMessageHeaderAndPayloadSize doesn't exclude all bytes other than the metadata and payload, e.g. the 4 bytes checksum field.
   
   Firstly, in this PR, the order in which `messageMetadata` are se has been concerned to make sure that `totalchunks` can be calculated correctly. Secondly, Since the go client places the data directly into the write buffer instead of creating a data buffer for `OpSendMsg` in advance, Go client doesn't have the problem mentioned in reason 2.
   
   By the way, the unit test for `chunkSize` has been added to message_chunking_test.go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961437360


##########
pulsar/producer_partition.go:
##########
@@ -463,12 +465,12 @@ func (p *partitionProducer) Name() string {
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {
-	p.log.Debug("Received send request: ", *request)
+	p.log.Debug("Received send request: ", *request.msg)

Review Comment:
   Because this call causes a warning about “copies the lock value” . It's just a debug log, so this change can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking to produce big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1196809954

   > Based on Java implementation, ConsumerImpl.java, Chunking also requires changes at the consumer side to be able to assembly chunks into the original message. Are you going to add support on the consumer side?
   
   The consumer side implement is already planned. It will be commited in another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002951489


##########
pulsar/consumer_impl.go:
##########
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return
-	}
-
-	if mid.consumer != nil {
-		mid.Nack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_impl.go:
##########
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
 		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
 	}
 
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return nil
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002965643


##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   L171 compared the `bc.buffer.ReadableBytes() + msgSize` with `bc.maxMessageSize` and the original code does not.
   
   It used to make sure that one batch size does not exceed the `maxMessageSize`. It's a part of correctly calculation whether the message is too large.
   
   By the way, the compare code is too long to be inline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961383344


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {

Review Comment:
   Could we add some unit tests to test this method?



##########
pulsar/producer_partition.go:
##########
@@ -463,12 +465,12 @@ func (p *partitionProducer) Name() string {
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {
-	p.log.Debug("Received send request: ", *request)
+	p.log.Debug("Received send request: ", *request.msg)

Review Comment:
   Why changed here?



##########
pulsar/producer_partition.go:
##########
@@ -843,33 +1018,24 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
 		return
 	}
 
+	// bc only works when DisableBlockIfQueueFull is false
+	bc := make(chan struct{})
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
 		callback:         callback,
 		flushImmediately: flushImmediately,
 		publishTime:      time.Now(),
+		blockCh:          bc,
 	}
 	p.options.Interceptors.BeforeSend(p, msg)
 
-	if p.options.DisableBlockIfQueueFull {

Review Comment:
   If we remove these codes. It seems will be blocked at `p.eventsChan <- sr` if it reaches `maxPendingMessages` even if the DisableBlockIfQueueFull is true. Because the eventsChan is initialized with `make(chan interface{}, maxPendingMessages)`.



##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is not closed.

Review Comment:
   ```suggestion
   	// Chunking can not be enabled when batching is enabled.
   ```



##########
pulsar/producer_partition.go:
##########
@@ -246,42 +256,14 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 
-	var encryptor internalcrypto.Encryptor
+	//var encryptor internalcrypto.Encryptor

Review Comment:
   Please remove this comment.



##########
pulsar/producer_partition.go:
##########
@@ -717,6 +886,7 @@ func (p *partitionProducer) failTimeoutMessages() {
 			for _, i := range pi.sendRequests {
 				sr := i.(*sendRequest)
 				if sr.msg != nil {
+					// todo: it's not correct. the size should be schemaed uncompressed payload size

Review Comment:
   Let's create an issue to track it and remove this comment.



##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {
+	mm = &pb.MessageMetadata{
+		ProducerName:     &p.producerName,
+		PublishTime:      proto.Uint64(internal.TimestampMillis(time.Now())),
+		ReplicateTo:      msg.ReplicationClusters,
+		UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+	}
+
+	if msg.Key != "" {
+		mm.PartitionKey = proto.String(msg.Key)
+	}
+
+	if msg.Properties != nil {
+		mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+	}
+
 	deliverAt := msg.DeliverAt
 	if msg.DeliverAfter.Nanoseconds() > 0 {
 		deliverAt = time.Now().Add(msg.DeliverAfter)
 	}
+	if deliverAt.UnixNano() > 0 {
+		mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	return
+}
 
-	smm := &pb.SingleMessageMetadata{
-		PayloadSize: proto.Int(len(payload)),
+func (p *partitionProducer) genSingleMetaMessage(msg *ProducerMessage,

Review Comment:
   Is this method only used for batch sending messages? Not the normal message(Neither chunking or batching)?



##########
pulsar/producer_partition.go:
##########
@@ -827,6 +998,10 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
 
 	// wait for send request to finish
 	<-doneCh
+
+	// handle internal error
+	p.internalErrHandle(err)

Review Comment:
   Why do we need to introduce the internal error handler here? Can we handle this error inside the sending operation?



##########
pulsar/producer_partition.go:
##########
@@ -911,6 +1077,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 				p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
 				p.metrics.MessagesPublished.Inc()
 				p.metrics.MessagesPending.Dec()
+				// todo: it's not correct. the size should be schemaed uncompressed payload size

Review Comment:
   Let's create an issue to track it and remove this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r963695252


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {

Review Comment:
   Related issue: https://github.com/apache/pulsar/pull/16196/files#
   The payload chunk size may be calculated not correctly by two reasons in the Java client. 
   1. The MessageMetadata will be updated after computing the payload chunk size, i.e. the actual metadata size would be greater.
   2. OpSendMsg#getMessageHeaderAndPayloadSize doesn't exclude all bytes other than the metadata and payload, e.g. the 4 bytes checksum field.
   
   Firstly, in this PR, the order in which `messageMetadata` are se has been concerned to make sure that `totalchunks` can be calculated correctly. Secondly, Since the go client places the data directly into the write buffer instead of creating a data buffer for `OpSendMsg` in advance, Go client doesn't have the problem mentioned in reason 2.
   
   By the way, the unit test for `chunkSize` has been added to message_chunking_test.go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r974254316


##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is enabled.
+	EnableChunking bool
+
+	// MaxChunkSize is the max size of single chunk payload.
+	// It will actually only take effect if it is smaller than broker.MaxMessageSize

Review Comment:
   Good suggestion. I have fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983303572


##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)

Review Comment:
   You r right. I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983277131


##########
pulsar/producer_partition.go:
##########
@@ -1050,3 +1270,41 @@ func (p *partitionProducer) _getConn() internal.Connection {
 	//            invariant is broken
 	return p.conn.Load().(internal.Connection)
 }
+
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
+	if p.options.DisableBlockIfQueueFull {
+		if !p.publishSemaphore.TryAcquire() {
+			if sr.callback != nil {
+				sr.callback(nil, sr.msg, errSendQueueIsFull)
+			}
+			return false
+		}
+	} else if !p.publishSemaphore.Acquire(sr.ctx) {
+		sr.callback(nil, sr.msg, errContextExpired)
+		sr.blockCh <- struct{}{}
+		return false
+	} else if sr.totalChunks == 0 || sr.totalChunks == 1 || (sr.totalChunks > 1 && sr.chunkID == sr.totalChunks-1) {

Review Comment:
   OK, I has removed it. The blockCh will be closed by a method safety.
   
   totalChunks is initialized as 0. Do you think it's better to be initailized as 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r972711264


##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is enabled.
+	EnableChunking bool
+
+	// MaxChunkSize is the max size of single chunk payload.
+	// It will actually only take effect if it is smaller than broker.MaxMessageSize
+	MaxChunkSize uint

Review Comment:
   It's better to keep it consistent with the java client: `ChunkMaxMessageSize`. 



##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)

Review Comment:
   Seems to need to release publishSemaphore here.



##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set MaxChunkSize
+		if p.options.MaxChunkSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
 	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {

Review Comment:
   Seems the DisableBlockIfQueueFull will not work here.



##########
pulsar/producer_partition.go:
##########
@@ -560,49 +728,60 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		smm.Properties = internal.ConvertFromStringMap(msg.Properties)
 	}
 
+	var sequenceID uint64
 	if msg.SequenceID != nil {
-		sequenceID := uint64(*msg.SequenceID)
-		smm.SequenceId = proto.Uint64(sequenceID)
+		sequenceID = uint64(*msg.SequenceID)
+	} else {
+		sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
 	}
 
-	if !sendAsBatch {
-		p.internalFlushCurrentBatch()
-	}
+	smm.SequenceId = proto.Uint64(sequenceID)
 
-	if msg.DisableReplication {
-		msg.ReplicationClusters = []string{"__local__"}
-	}
-	multiSchemaEnabled := !p.options.DisableMultiSchema
-	added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-		msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
-	if !added {
-		// The current batch is full.. flush it and retry
+	return
+}
 
-		p.internalFlushCurrentBatch()
+func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+	compressedPayload []byte,
+	request *sendRequest,
+	maxMessageSize uint32) {
+	msg := request.msg
 
-		// after flushing try again to add the current payload
-		if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
-			p.publishSemaphore.Release()
-			request.callback(nil, request.msg, errFailAddToBatch)
-			p.log.WithField("size", len(payload)).
-				WithField("properties", msg.Properties).
-				Error("unable to add message to batch")
-			return
-		}
-	}
+	payloadBuf := internal.NewBuffer(len(compressedPayload))
+	payloadBuf.Write(compressedPayload)
 
-	if !sendAsBatch || request.flushImmediately {
+	buffer := p.GetBuffer()
+	if buffer == nil {
+		buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
+	}
 
-		p.internalFlushCurrentBatch()
+	sid := *mm.SequenceId
 
+	if err := internal.SingleSend(
+		buffer,
+		p.producerID,
+		sid,
+		mm,
+		payloadBuf,
+		p.encryptor,
+		maxMessageSize,
+	); err != nil {
+		request.callback(nil, request.msg, err)

Review Comment:
   Seems need to release the publishSemaphore here.



##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is enabled.
+	EnableChunking bool
+
+	// MaxChunkSize is the max size of single chunk payload.
+	// It will actually only take effect if it is smaller than broker.MaxMessageSize

Review Comment:
   ```suggestion
   	// It will actually only take effect if it is smaller than the maxMessageSize from the broker.
   ```



##########
pulsar/producer_partition.go:
##########
@@ -745,28 +933,34 @@ func (p *partitionProducer) failTimeoutMessages() {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatches() {
-	batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches()
+	batchesData, sequenceIDs, callbacks, errs := p.batchBuilder.FlushBatches()

Review Comment:
   Why rename it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r966683647


##########
pulsar/consumer.go:
##########
@@ -189,6 +189,17 @@ type ConsumerOptions struct {
 	// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
 	// Default: false
 	AckWithResponse bool
+
+	// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
+	MaxPendingChunkedMessage int
+
+	// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message.
+	// The timing accuracy is 100ms level and the default value is 60 * 1000 millis

Review Comment:
   Looks like you should a description for the minimum value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961474187


##########
pulsar/producer_partition.go:
##########
@@ -827,6 +998,10 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
 
 	// wait for send request to finish
 	<-doneCh
+
+	// handle internal error
+	p.internalErrHandle(err)

Review Comment:
   If we handle this error inside the sending operation, we can only do it in following.
   https://github.com/apache/pulsar-client-go/blob/2d5f6fcfa22af4184793a22ca353731a02b54ca9/pulsar/producer_partition.go#L627-L636
   I don't think it's obvious, do you have a better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961498806


##########
pulsar/producer_partition.go:
##########
@@ -246,42 +256,14 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 
-	var encryptor internalcrypto.Encryptor
+	//var encryptor internalcrypto.Encryptor

Review Comment:
   OK, I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r964574405


##########
pulsar/producer_partition.go:
##########
@@ -463,12 +465,12 @@ func (p *partitionProducer) Name() string {
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {
-	p.log.Debug("Received send request: ", *request)
+	p.log.Debug("Received send request: ", *request.msg)

Review Comment:
   > Why changed here?
   
   Now if I don't change it, the code can not pass golangci-lint: `Error: copylocks: call of p.log.Debug copies lock value: github.com/apache/pulsar-client-go/pulsar.sendRequest contains sync.Once contains sync.Mutex (govet)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1288763369

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1003030610


##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   > Good catch, but using inline looks like more clear, so like:
   
   I have tried but this line is too long. It can not pass the golangci-lint
   
   > Good catch, but using inline looks like more clear, so like:
   > 
   > ```
   > return bc.numMessages+1 <= bc.maxMessages && expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
   > ```
   
   Done, thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r970253454


##########
pulsar/consumer_impl.go:
##########
@@ -453,16 +465,16 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return errors.New("failed to convert trackingMessageID")
-	}
-
-	if mid.consumer != nil {
-		return mid.Ack()
+	partition := int(msgID.PartitionIdx())

Review Comment:
   > Why not make a method to check the partition or the msgID?
   
   Good suggestion, I will make the method.
   
   > You can improve messageID() method.
   
   `messageID()` converts the `MessageID` to `trackingMessageID`. It will cause the type lost of `chunkMessageID`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] maraiskruger1980 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by "maraiskruger1980 (via GitHub)" <gi...@apache.org>.
maraiskruger1980 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1538054849

   Does the chunking work on shared subscription introduced in Pulsar 2.11?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r970253975


##########
pulsar/internal/timewheel.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"container/list"
+	"log"
+	"time"
+)
+
+// TimeWheel can execute job after waiting given duration
+type TimeWheel struct {

Review Comment:
   I just realized that this code is from a existed repo. I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r969794999


##########
pulsar/internal/timewheel.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"container/list"
+	"log"
+	"time"
+)
+
+// TimeWheel can execute job after waiting given duration
+type TimeWheel struct {

Review Comment:
   This code should come from https://github.com/ouqiang/timewheel/blob/master/timewheel.go



##########
pulsar/internal/timewheel.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"container/list"
+	"log"
+	"time"
+)
+
+// TimeWheel can execute job after waiting given duration
+type TimeWheel struct {

Review Comment:
   This code should come from https://github.com/ouqiang/timewheel/blob/master/timewheel.go



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983459841


##########
pulsar/consumer_partition.go:
##########
@@ -802,6 +886,55 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	return nil
 }
 
+func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buffer,
+	msgMeta *pb.MessageMetadata,
+	pbMsgID *pb.MessageIdData) internal.Buffer {
+	uuid := msgMeta.GetUuid()
+	numChunks := msgMeta.GetNumChunksFromMsg()
+	totalChunksSize := int(msgMeta.GetTotalChunkMsgSize())
+	chunkID := msgMeta.GetChunkId()
+	msgID := messageID{
+		ledgerID:     int64(pbMsgID.GetLedgerId()),
+		entryID:      int64(pbMsgID.GetEntryId()),
+		batchIdx:     -1,
+		partitionIdx: pc.partitionIdx,
+	}
+
+	if msgMeta.GetChunkId() == 0 {
+		pc.chunkedMsgCtxMap.addIfAbsent(uuid,
+			numChunks,
+			totalChunksSize,
+		)
+	}
+
+	ctx := pc.chunkedMsgCtxMap.get(uuid)
+
+	if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
+		lastChunkedMsgID := -1
+		totalChunks := -1
+		if ctx != nil {
+			lastChunkedMsgID = int(ctx.lastChunkedMsgID)
+			totalChunks = int(ctx.totalChunks)
+			ctx.chunkedMsgBuffer.Clear()
+		}
+		pc.log.Warnf(fmt.Sprintf(
+			"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
+			msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
+		pc.chunkedMsgCtxMap.remove(uuid)
+		atomic.AddInt32(&pc.availablePermits, 1)
+		return nil
+	}
+
+	ctx.refresh(chunkID, msgID, compressedPayload)

Review Comment:
   Good suggestion, thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983546515


##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {

Review Comment:
   Yes, the biggest challenge is that it is hard to `stopBroker` here. So `TestExpireIncompleteChunksTestExpireIncompleteChunks` is not the "real" `expire`. 
   
   Do you think such a test has enough coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002836092


##########
pulsar/consumer_impl.go:
##########
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return
-	}
-
-	if mid.consumer != nil {
-		mid.Nack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_partition.go:
##########
@@ -698,8 +746,23 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		}
 	}
 
+	isChunkedMsg := false
+	if msgMeta.GetNumChunksFromMsg() > 1 {
+		isChunkedMsg = true
+	}
+
+	processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)

Review Comment:
   ```suggestion
   	var processedPayloadBuffer internal.Buffer
   ```



##########
pulsar/consumer_impl.go:
##########
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
 		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
 	}
 
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return nil
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Same as above.



##########
pulsar/consumer_impl.go:
##########
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return errors.New("failed to convert trackingMessageID")
-	}
-
-	if mid.consumer != nil {
-		return mid.Ack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   Missed covert the `msgID` from the `MessageID` to the `trackingMessageID` type, I'm not sure if we need this.
   
   Why not use `messageID()`, what did I miss?
   
   



##########
pulsar/producer_partition.go:
##########
@@ -299,6 +278,26 @@ func (p *partitionProducer) grabCnx() error {
 	if err != nil {
 		return err
 	}
+
+	if p.options.DisableBatching {

Review Comment:
   The following code looks like more clear:
   ```
   	if !p.options.DisableBatching {
   		batcherBuilderType := p.options.BatcherBuilderType
   		provider, err := GetBatcherBuilderProvider(batcherBuilderType)
   		if err != nil {
   			return err
   		}
   		maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
   		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
   			maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
   			compression.Level(p.options.CompressionLevel),
   			p,
   			p.log,
   			p.encryptor)
   		if err != nil {
   			return err
   		}
   	}
   ```



##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   Please revert this code(170-174).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002998935


##########
pulsar/internal/batch_builder.go:
##########
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize

Review Comment:
   Good catch, but using inline looks like more clear. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r980988431


##########
pulsar/consumer_impl.go:
##########
@@ -91,6 +90,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		}
 	}
 
+	if options.MaxPendingChunkedMessage == 0 {
+		options.MaxPendingChunkedMessage = 100
+	}
+
+	// the minimum timer interval is 100ms

Review Comment:
   Seems the java client doesn't introduce the minimum value for this parameter. It will introduce inconsistent behavior here. Even if we need to introduce a minimum value, we should throw an error instead of changing the value set by the user, otherwise it will make the user confused.
   
   I think we need to implement it in the java client first. And this can be done in another PR.



##########
pulsar/producer_partition.go:
##########
@@ -1050,3 +1270,41 @@ func (p *partitionProducer) _getConn() internal.Connection {
 	//            invariant is broken
 	return p.conn.Load().(internal.Connection)
 }
+
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
+	if p.options.DisableBlockIfQueueFull {
+		if !p.publishSemaphore.TryAcquire() {
+			if sr.callback != nil {
+				sr.callback(nil, sr.msg, errSendQueueIsFull)
+			}
+			return false
+		}
+	} else if !p.publishSemaphore.Acquire(sr.ctx) {
+		sr.callback(nil, sr.msg, errContextExpired)
+		sr.blockCh <- struct{}{}
+		return false
+	} else if sr.totalChunks == 0 || sr.totalChunks == 1 || (sr.totalChunks > 1 && sr.chunkID == sr.totalChunks-1) {

Review Comment:
   Could you move this logic out of `canAddToQueue`? I think it's better to move it to `internalSend`.
   And I think totalChunks should not be possible to be `0`.



##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)
+	}
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return nil
+	}
+	return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	ctx, ok := c.chunkedMsgCtxs[uuid]
+	if !ok {
+		return
+	}
+	if autoAck {
+		ctx.discard(c.pc)
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", uuid)
+}
+
+func (c *chunkedMsgCtxMap) removeChunkIfExpire(uuid string, autoAck bool, expire time.Duration) {
+	timer := time.NewTimer(expire)
+	<-timer.C
+	c.removeChunkMessage(uuid, autoAck)
+}
+
+func (c *chunkedMsgCtxMap) Close() {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.closed = true
+}
+
+type unAckChunksTracker struct {
+	chunkIDs map[chunkMessageID][]messageID
+	pc       *partitionConsumer
+	mu       sync.Mutex
+}
+
+func newUnAckChunksTracker(pc *partitionConsumer) *unAckChunksTracker {
+	return &unAckChunksTracker{
+		chunkIDs: make(map[chunkMessageID][]messageID),
+		pc:       pc,
+	}
+}
+
+func (u *unAckChunksTracker) add(cmid chunkMessageID, ids []messageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	u.chunkIDs[cmid] = ids
+}
+
+func (u *unAckChunksTracker) get(cmid chunkMessageID) []messageID {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	return u.chunkIDs[cmid]
+}
+
+func (u *unAckChunksTracker) remove(cmid chunkMessageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	delete(u.chunkIDs, cmid)
+}
+
+func (u *unAckChunksTracker) ack(cmid chunkMessageID) error {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		if err := u.pc.AckID(id); err != nil {
+			return err
+		}
+	}
+	u.remove(cmid)
+	return nil
+}
+
+func (u *unAckChunksTracker) nack(cmid chunkMessageID) {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		u.pc.nackTracker.Add(id)
+		u.pc.metrics.NacksCounter.Inc()

Review Comment:
   I think it's better to call `u.pc.NackID` here. Otherwise, for these codes, we need to maintain in two places.



##########
pulsar/consumer_partition.go:
##########
@@ -802,6 +886,55 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	return nil
 }
 
+func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buffer,
+	msgMeta *pb.MessageMetadata,
+	pbMsgID *pb.MessageIdData) internal.Buffer {
+	uuid := msgMeta.GetUuid()
+	numChunks := msgMeta.GetNumChunksFromMsg()
+	totalChunksSize := int(msgMeta.GetTotalChunkMsgSize())
+	chunkID := msgMeta.GetChunkId()
+	msgID := messageID{
+		ledgerID:     int64(pbMsgID.GetLedgerId()),
+		entryID:      int64(pbMsgID.GetEntryId()),
+		batchIdx:     -1,
+		partitionIdx: pc.partitionIdx,
+	}
+
+	if msgMeta.GetChunkId() == 0 {
+		pc.chunkedMsgCtxMap.addIfAbsent(uuid,
+			numChunks,
+			totalChunksSize,
+		)
+	}
+
+	ctx := pc.chunkedMsgCtxMap.get(uuid)
+
+	if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
+		lastChunkedMsgID := -1
+		totalChunks := -1
+		if ctx != nil {
+			lastChunkedMsgID = int(ctx.lastChunkedMsgID)
+			totalChunks = int(ctx.totalChunks)
+			ctx.chunkedMsgBuffer.Clear()
+		}
+		pc.log.Warnf(fmt.Sprintf(
+			"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
+			msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
+		pc.chunkedMsgCtxMap.remove(uuid)
+		atomic.AddInt32(&pc.availablePermits, 1)
+		return nil
+	}
+
+	ctx.refresh(chunkID, msgID, compressedPayload)

Review Comment:
   It's better to rename it to `ctx.apeend`.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(_brokerMaxMessageSize + 1),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	totalProducers := 5
+	producers := make([]Producer, 0, 20)
+	defer func() {
+		for _, p := range producers {
+			p.Close()
+		}
+	}()
+
+	clients := make([]Client, 0, 20)
+	defer func() {
+		for _, c := range clients {
+			c.Close()
+		}
+	}()
+
+	for j := 0; j < totalProducers; j++ {
+		pc, err := NewClient(ClientOptions{
+			URL: lookupURL,
+		})
+		assert.Nil(t, err)
+		clients = append(clients, pc)
+		producer, err := pc.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: 10,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, producer)
+		producers = append(producers, producer)
+	}
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                    topic,
+		Type:                     Exclusive,
+		SubscriptionName:         "chunk-subscriber",
+		MaxPendingChunkedMessage: 1,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	totalMsgs := 40
+	wg := sync.WaitGroup{}
+	wg.Add(totalMsgs * totalProducers)
+	for i := 0; i < totalMsgs; i++ {
+		for j := 0; j < totalProducers; j++ {
+			p := producers[j]
+			go func() {
+				ID, err := p.Send(context.Background(), &ProducerMessage{
+					Payload: createTestMessagePayload(50),
+				})
+				assert.NoError(t, err)
+				assert.NotNil(t, ID)
+				wg.Done()
+			}()
+		}
+	}
+	wg.Wait()
+
+	received := 0
+	for i := 0; i < totalMsgs*totalProducers; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		if msg == nil || (err != nil && errors.Is(err, context.DeadlineExceeded)) {
+			break
+		}
+
+		received++
+
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	c, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		Type:                        Exclusive,
+		SubscriptionName:            "chunk-subscriber",
+		ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+	})
+	assert.NoError(t, err)
+	defer c.Close()
+
+	uuid := "test-uuid"
+	chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+	chunkCtxMap.addIfAbsent(uuid, 2, 100)
+	ctx := chunkCtxMap.get(uuid)
+	assert.NotNil(t, ctx)
+
+	time.Sleep(400 * time.Millisecond)
+
+	ctx = chunkCtxMap.get(uuid)
+	assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		EnableChunking:          true,
+		DisableBatching:         true,
+		MaxPendingMessages:      10,
+		ChunkMaxMessageSize:     50,
+		DisableBlockIfQueueFull: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(1000),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	totalMessages := 5
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "default-seek",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	msgIDs := make([]MessageID, 0)
+	for i := 0; i < totalMessages; i++ {
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(100),
+		})
+		assert.NoError(t, err)
+		msgIDs = append(msgIDs, ID)
+	}
+
+	for i := 0; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	err = consumer.Seek(msgIDs[1])
+	assert.NoError(t, err)
+
+	for i := 1; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}

Review Comment:
   Seems that there is no startMessageIdInclusive option in the consumer option currently. But it exists in the reader option. In this test, we should create two consumer(with startMessageIdInclusive true and false), if the startMessageIdInclusive is true, then we can receive msgIDs[1] after the seek operation. Otherwise, we should receive msgIDs[2]. 
   
   The default value of startMessageIdInclusive is false, so the default behavior of the consumer is to receive msgIDs[2] after the seek operation. We need to add this option to the consumer. But if it's too complex to add to this PR, we can add it to a separate PR.



##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)
+	}
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return nil
+	}
+	return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {

Review Comment:
   I think we can rename this method to `discardChunkedMessage`. Otherwise, it will confuse others because there are two `remove` methods.



##########
pulsar/impl_message.go:
##########
@@ -372,3 +386,28 @@ func (t *ackTracker) completed() bool {
 	defer t.Unlock()
 	return len(t.batchIDs.Bits()) == 0
 }
+
+type chunkMessageID struct {
+	messageID
+
+	firstChunkID messageID
+	receivedTime time.Time
+
+	consumer acker
+}
+
+func newChunkMessageID(firstChunkID messageID, lastChunkID messageID) chunkMessageID {
+	return chunkMessageID{
+		messageID:    lastChunkID,
+		firstChunkID: firstChunkID,
+		receivedTime: time.Now(),
+	}
+}
+
+func (id chunkMessageID) String() string {
+	return fmt.Sprintf("%s;%s", id.firstChunkID.String(), id.messageID.String())
+}
+
+func (id chunkMessageID) Serialize() []byte {
+	return id.firstChunkID.Serialize()

Review Comment:
   Should use `MessageIdData` to serialize the chunk message id.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {

Review Comment:
   I think we can remove this test because it has been covered in other tests.  Regarding the `testPublishWithFailure` in the java client,  it's hard to simulate `stopBroker` here. I think it's no need to add that test here.



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {

Review Comment:
   ```suggestion
   func TestPublishLargeMessagesFailedWithoutChunking(t *testing.T) {
   ```



##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(_brokerMaxMessageSize + 1),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	totalProducers := 5
+	producers := make([]Producer, 0, 20)
+	defer func() {
+		for _, p := range producers {
+			p.Close()
+		}
+	}()
+
+	clients := make([]Client, 0, 20)
+	defer func() {
+		for _, c := range clients {
+			c.Close()
+		}
+	}()
+
+	for j := 0; j < totalProducers; j++ {
+		pc, err := NewClient(ClientOptions{
+			URL: lookupURL,
+		})
+		assert.Nil(t, err)
+		clients = append(clients, pc)
+		producer, err := pc.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: 10,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, producer)
+		producers = append(producers, producer)
+	}
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                    topic,
+		Type:                     Exclusive,
+		SubscriptionName:         "chunk-subscriber",
+		MaxPendingChunkedMessage: 1,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	totalMsgs := 40
+	wg := sync.WaitGroup{}
+	wg.Add(totalMsgs * totalProducers)
+	for i := 0; i < totalMsgs; i++ {
+		for j := 0; j < totalProducers; j++ {
+			p := producers[j]
+			go func() {
+				ID, err := p.Send(context.Background(), &ProducerMessage{
+					Payload: createTestMessagePayload(50),
+				})
+				assert.NoError(t, err)
+				assert.NotNil(t, ID)
+				wg.Done()
+			}()
+		}
+	}
+	wg.Wait()
+
+	received := 0
+	for i := 0; i < totalMsgs*totalProducers; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		if msg == nil || (err != nil && errors.Is(err, context.DeadlineExceeded)) {
+			break
+		}
+
+		received++
+
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	c, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		Type:                        Exclusive,
+		SubscriptionName:            "chunk-subscriber",
+		ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+	})
+	assert.NoError(t, err)
+	defer c.Close()
+
+	uuid := "test-uuid"
+	chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+	chunkCtxMap.addIfAbsent(uuid, 2, 100)
+	ctx := chunkCtxMap.get(uuid)
+	assert.NotNil(t, ctx)
+
+	time.Sleep(400 * time.Millisecond)
+
+	ctx = chunkCtxMap.get(uuid)
+	assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		EnableChunking:          true,
+		DisableBatching:         true,
+		MaxPendingMessages:      10,
+		ChunkMaxMessageSize:     50,
+		DisableBlockIfQueueFull: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(1000),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	totalMessages := 5
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "default-seek",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	msgIDs := make([]MessageID, 0)
+	for i := 0; i < totalMessages; i++ {
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(100),
+		})
+		assert.NoError(t, err)
+		msgIDs = append(msgIDs, ID)
+	}
+
+	for i := 0; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	err = consumer.Seek(msgIDs[1])
+	assert.NoError(t, err)
+
+	for i := 1; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	// todo: add reader seek test when support reader read chunk message
+}
+
+func TestChunkAckAndNAck(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		Type:                Exclusive,
+		SubscriptionName:    "default-seek",
+		NackRedeliveryDelay: time.Second,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	content := createTestMessagePayload(100)
+
+	_, err = producer.Send(context.Background(), &ProducerMessage{
+		Payload: content,
+	})
+	assert.NoError(t, err)
+
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+	msg, err := consumer.Receive(ctx)
+	cancel()
+	assert.NoError(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, msg.Payload(), content)
+
+	consumer.Nack(msg)
+	time.Sleep(time.Second * 2)
+
+	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
+	msg, err = consumer.Receive(ctx)
+	cancel()
+	assert.NoError(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, msg.Payload(), content)
+}
+
+func TestChunkSize(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// the default message metadata size for string schema
+	// The proto messageMetaData size as following. (all with tag) (maxMessageSize = 1024 * 1024)
+	// | producerName | sequenceID | publishTime | uncompressedSize |
+	// | ------------ | ---------- | ----------- | ---------------- |
+	// | 6            | 2          | 7           | 4                |
+	payloadChunkSize := _brokerMaxMessageSize - 19
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Name:            "test",
+		Topic:           topic,
+		EnableChunking:  true,
+		DisableBatching: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	for size := payloadChunkSize; size <= _brokerMaxMessageSize; size++ {
+		msgID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(size),
+		})
+		assert.NoError(t, err)
+		if size <= payloadChunkSize {
+			_, ok := msgID.(messageID)
+			assert.Equal(t, true, ok)
+		} else {
+			_, ok := msgID.(chunkMessageID)
+			assert.Equal(t, true, ok)
+		}
+	}
+}
+
+func TestChunkMultiTopicConsumerReceive(t *testing.T) {
+	topic1 := newTopicName()
+	topic2 := newTopicName()
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	topics := []string{topic1, topic2}
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topics:           topics,
+		SubscriptionName: "multi-topic-sub",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	maxSize := 50
+
+	// produce messages
+	for i, topic := range topics {
+		p, err := client.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: uint(maxSize),
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		err = genMessages(p, 10, func(idx int) string {
+			return fmt.Sprintf("topic-%d-hello-%d-%s", i+1, idx, string(createTestMessagePayload(100)))
+		})
+		p.Close()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	receivedTopic1 := 0
+	receivedTopic2 := 0
+	// nolint
+	for receivedTopic1+receivedTopic2 < 20 {
+		select {
+		case cm, ok := <-consumer.Chan():
+			if ok {
+				msg := string(cm.Payload())
+				if strings.HasPrefix(msg, "topic-1") {
+					receivedTopic1++
+				} else if strings.HasPrefix(msg, "topic-2") {
+					receivedTopic2++
+				}
+				consumer.Ack(cm.Message)
+			} else {
+				t.Fail()
+			}
+		}

Review Comment:
   It's better to add the timeout for this select. 



##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)

Review Comment:
   Need to release the first chunked message, but this seems to remove the latest one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973556288


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	if !sendAsBatch {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)

Review Comment:
   You're right. And it seems that there are some original code that did not release publishSemaphore too.
   https://github.com/apache/pulsar-client-go/blob/edd5c71651b79bd35358a51ae3925905ed9f17e1/pulsar/producer_partition.go#L484-L490
   
   I have fixed all the publishSemaphore which should be released in the new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r974260113


##########
pulsar/producer_partition.go:
##########
@@ -560,49 +728,60 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		smm.Properties = internal.ConvertFromStringMap(msg.Properties)
 	}
 
+	var sequenceID uint64
 	if msg.SequenceID != nil {
-		sequenceID := uint64(*msg.SequenceID)
-		smm.SequenceId = proto.Uint64(sequenceID)
+		sequenceID = uint64(*msg.SequenceID)
+	} else {
+		sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
 	}
 
-	if !sendAsBatch {
-		p.internalFlushCurrentBatch()
-	}
+	smm.SequenceId = proto.Uint64(sequenceID)
 
-	if msg.DisableReplication {
-		msg.ReplicationClusters = []string{"__local__"}
-	}
-	multiSchemaEnabled := !p.options.DisableMultiSchema
-	added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-		msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
-	if !added {
-		// The current batch is full.. flush it and retry
+	return
+}
 
-		p.internalFlushCurrentBatch()
+func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+	compressedPayload []byte,
+	request *sendRequest,
+	maxMessageSize uint32) {
+	msg := request.msg
 
-		// after flushing try again to add the current payload
-		if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
-			p.publishSemaphore.Release()
-			request.callback(nil, request.msg, errFailAddToBatch)
-			p.log.WithField("size", len(payload)).
-				WithField("properties", msg.Properties).
-				Error("unable to add message to batch")
-			return
-		}
-	}
+	payloadBuf := internal.NewBuffer(len(compressedPayload))
+	payloadBuf.Write(compressedPayload)
 
-	if !sendAsBatch || request.flushImmediately {
+	buffer := p.GetBuffer()
+	if buffer == nil {
+		buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
+	}
 
-		p.internalFlushCurrentBatch()
+	sid := *mm.SequenceId
 
+	if err := internal.SingleSend(
+		buffer,
+		p.producerID,
+		sid,
+		mm,
+		payloadBuf,
+		p.encryptor,
+		maxMessageSize,
+	); err != nil {
+		request.callback(nil, request.msg, err)

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r974253875


##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is enabled.
+	EnableChunking bool
+
+	// MaxChunkSize is the max size of single chunk payload.
+	// It will actually only take effect if it is smaller than broker.MaxMessageSize
+	MaxChunkSize uint

Review Comment:
   I think you r right. I have fix it in the new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961505608


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {

Review Comment:
   You mean testing `genMetadata`? This is just a simple generator function for `MessageMetadata`, what aspects should I test if I want to test it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961495238


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	mm := p.genMetadata(msg, uncompressedSize)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	// todo: deliverAt has calculated in genMetadata but it's not a good idea to make genMetadata() return it.
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+	}
+
+	// correct limit queue when chunked
+	for i := 0; i < totalChunks-1; i++ {
+		if !p.canAddToQueue(request) {
+			return
+		}
+	}
+
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if msg.SequenceID != nil {
+			mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+		} else {
+			mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+		}
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:         request.ctx,
+					msg:         request.msg,
+					callback:    request.callback,
+					publishTime: request.publishTime,
+					totalChunks: totalChunks,
+					chunkID:     chunkID,
+					uuid:        uuid,
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+		} else {
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMetaMessage(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
+
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize int) (mm *pb.MessageMetadata) {
+	mm = &pb.MessageMetadata{
+		ProducerName:     &p.producerName,
+		PublishTime:      proto.Uint64(internal.TimestampMillis(time.Now())),
+		ReplicateTo:      msg.ReplicationClusters,
+		UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+	}
+
+	if msg.Key != "" {
+		mm.PartitionKey = proto.String(msg.Key)
+	}
+
+	if msg.Properties != nil {
+		mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+	}
+
 	deliverAt := msg.DeliverAt
 	if msg.DeliverAfter.Nanoseconds() > 0 {
 		deliverAt = time.Now().Add(msg.DeliverAfter)
 	}
+	if deliverAt.UnixNano() > 0 {
+		mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	return
+}
 
-	smm := &pb.SingleMessageMetadata{
-		PayloadSize: proto.Int(len(payload)),
+func (p *partitionProducer) genSingleMetaMessage(msg *ProducerMessage,

Review Comment:
   Yes, the chunking or single message do not need smm (SingleMessageMetadata), it only used for batching messages. (org.apache.pulsar.common.protocol.Commands#serializeSingleMessageInBatchWithPayload(org.apache.pulsar.common.api.proto.MessageMetadata, io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961499351


##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is not closed.

Review Comment:
   OK, I will modify this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983281714


##########
pulsar/consumer_impl.go:
##########
@@ -91,6 +90,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		}
 	}
 
+	if options.MaxPendingChunkedMessage == 0 {
+		options.MaxPendingChunkedMessage = 100
+	}
+
+	// the minimum timer interval is 100ms

Review Comment:
   Sorry, this minimum value is original for `timewheel`. Given that `timewheel` has been removed, this minimum value is unneccessary.
   I will remove it.



##########
pulsar/consumer_impl.go:
##########
@@ -91,6 +90,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		}
 	}
 
+	if options.MaxPendingChunkedMessage == 0 {
+		options.MaxPendingChunkedMessage = 100
+	}
+
+	// the minimum timer interval is 100ms

Review Comment:
   Sorry, this minimum value is original for `timewheel`. Given that `timewheel` has been removed, this minimum value is unneccessary.
   
   I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983458942


##########
pulsar/consumer_partition.go:
##########
@@ -1475,3 +1609,210 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) refresh(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.removeChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.removeChunkMessage(uuid, c.pc.options.autoAckIncompleteChunk)
+	}
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return nil
+	}
+	return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+}
+
+func (c *chunkedMsgCtxMap) removeChunkMessage(uuid string, autoAck bool) {

Review Comment:
   Good suggestion, thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r963697232


##########
pulsar/producer_partition.go:
##########
@@ -717,6 +886,7 @@ func (p *partitionProducer) failTimeoutMessages() {
 			for _, i := range pi.sendRequests {
 				sr := i.(*sendRequest)
 				if sr.msg != nil {
+					// todo: it's not correct. the size should be schemaed uncompressed payload size

Review Comment:
   OK, it's done.



##########
pulsar/producer_partition.go:
##########
@@ -911,6 +1077,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 				p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
 				p.metrics.MessagesPublished.Inc()
 				p.metrics.MessagesPending.Dec()
+				// todo: it's not correct. the size should be schemaed uncompressed payload size

Review Comment:
   OK, it's done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r983555072


##########
pulsar/message_chunking_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestPublishChunkWithFailure(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(_brokerMaxMessageSize + 1),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	totalProducers := 5
+	producers := make([]Producer, 0, 20)
+	defer func() {
+		for _, p := range producers {
+			p.Close()
+		}
+	}()
+
+	clients := make([]Client, 0, 20)
+	defer func() {
+		for _, c := range clients {
+			c.Close()
+		}
+	}()
+
+	for j := 0; j < totalProducers; j++ {
+		pc, err := NewClient(ClientOptions{
+			URL: lookupURL,
+		})
+		assert.Nil(t, err)
+		clients = append(clients, pc)
+		producer, err := pc.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: 10,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, producer)
+		producers = append(producers, producer)
+	}
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                    topic,
+		Type:                     Exclusive,
+		SubscriptionName:         "chunk-subscriber",
+		MaxPendingChunkedMessage: 1,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	totalMsgs := 40
+	wg := sync.WaitGroup{}
+	wg.Add(totalMsgs * totalProducers)
+	for i := 0; i < totalMsgs; i++ {
+		for j := 0; j < totalProducers; j++ {
+			p := producers[j]
+			go func() {
+				ID, err := p.Send(context.Background(), &ProducerMessage{
+					Payload: createTestMessagePayload(50),
+				})
+				assert.NoError(t, err)
+				assert.NotNil(t, ID)
+				wg.Done()
+			}()
+		}
+	}
+	wg.Wait()
+
+	received := 0
+	for i := 0; i < totalMsgs*totalProducers; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		if msg == nil || (err != nil && errors.Is(err, context.DeadlineExceeded)) {
+			break
+		}
+
+		received++
+
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	c, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		Type:                        Exclusive,
+		SubscriptionName:            "chunk-subscriber",
+		ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+	})
+	assert.NoError(t, err)
+	defer c.Close()
+
+	uuid := "test-uuid"
+	chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+	chunkCtxMap.addIfAbsent(uuid, 2, 100)
+	ctx := chunkCtxMap.get(uuid)
+	assert.NotNil(t, ctx)
+
+	time.Sleep(400 * time.Millisecond)
+
+	ctx = chunkCtxMap.get(uuid)
+	assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		EnableChunking:          true,
+		DisableBatching:         true,
+		MaxPendingMessages:      10,
+		ChunkMaxMessageSize:     50,
+		DisableBlockIfQueueFull: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(1000),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	totalMessages := 5
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "default-seek",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	msgIDs := make([]MessageID, 0)
+	for i := 0; i < totalMessages; i++ {
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(100),
+		})
+		assert.NoError(t, err)
+		msgIDs = append(msgIDs, ID)
+	}
+
+	for i := 0; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	err = consumer.Seek(msgIDs[1])
+	assert.NoError(t, err)
+
+	for i := 1; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	// todo: add reader seek test when support reader read chunk message
+}
+
+func TestChunkAckAndNAck(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		Type:                Exclusive,
+		SubscriptionName:    "default-seek",
+		NackRedeliveryDelay: time.Second,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	content := createTestMessagePayload(100)
+
+	_, err = producer.Send(context.Background(), &ProducerMessage{
+		Payload: content,
+	})
+	assert.NoError(t, err)
+
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+	msg, err := consumer.Receive(ctx)
+	cancel()
+	assert.NoError(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, msg.Payload(), content)
+
+	consumer.Nack(msg)
+	time.Sleep(time.Second * 2)
+
+	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
+	msg, err = consumer.Receive(ctx)
+	cancel()
+	assert.NoError(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, msg.Payload(), content)
+}
+
+func TestChunkSize(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// the default message metadata size for string schema
+	// The proto messageMetaData size as following. (all with tag) (maxMessageSize = 1024 * 1024)
+	// | producerName | sequenceID | publishTime | uncompressedSize |
+	// | ------------ | ---------- | ----------- | ---------------- |
+	// | 6            | 2          | 7           | 4                |
+	payloadChunkSize := _brokerMaxMessageSize - 19
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Name:            "test",
+		Topic:           topic,
+		EnableChunking:  true,
+		DisableBatching: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	for size := payloadChunkSize; size <= _brokerMaxMessageSize; size++ {
+		msgID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(size),
+		})
+		assert.NoError(t, err)
+		if size <= payloadChunkSize {
+			_, ok := msgID.(messageID)
+			assert.Equal(t, true, ok)
+		} else {
+			_, ok := msgID.(chunkMessageID)
+			assert.Equal(t, true, ok)
+		}
+	}
+}
+
+func TestChunkMultiTopicConsumerReceive(t *testing.T) {
+	topic1 := newTopicName()
+	topic2 := newTopicName()
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	topics := []string{topic1, topic2}
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topics:           topics,
+		SubscriptionName: "multi-topic-sub",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	maxSize := 50
+
+	// produce messages
+	for i, topic := range topics {
+		p, err := client.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: uint(maxSize),
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		err = genMessages(p, 10, func(idx int) string {
+			return fmt.Sprintf("topic-%d-hello-%d-%s", i+1, idx, string(createTestMessagePayload(100)))
+		})
+		p.Close()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	receivedTopic1 := 0
+	receivedTopic2 := 0
+	// nolint
+	for receivedTopic1+receivedTopic2 < 20 {
+		select {
+		case cm, ok := <-consumer.Chan():
+			if ok {
+				msg := string(cm.Payload())
+				if strings.HasPrefix(msg, "topic-1") {
+					receivedTopic1++
+				} else if strings.HasPrefix(msg, "topic-2") {
+					receivedTopic2++
+				}
+				consumer.Ack(cm.Message)
+			} else {
+				t.Fail()
+			}
+		}

Review Comment:
   Good suggestion, thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1003112020


##########
pulsar/producer_partition.go:
##########
@@ -279,7 +279,7 @@ func (p *partitionProducer) grabCnx() error {
 		return err
 	}
 
-	if !p.options.DisableBatching {
+	if !p.options.DisableBatching && p.batchBuilder == nil {

Review Comment:
   Why add `p.batchBuilder == nil`, this code block is not reachable.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1001506222


##########
pulsar/consumer_partition.go:
##########
@@ -733,17 +796,38 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		pc.metrics.BytesReceived.Add(float64(len(payload)))
 		pc.metrics.PrefetchedBytes.Add(float64(len(payload)))
 
-		msgID := newTrackingMessageID(
+		trackingMsgID := newTrackingMessageID(
 			int64(pbMsgID.GetLedgerId()),
 			int64(pbMsgID.GetEntryId()),
 			int32(i),
 			pc.partitionIdx,
 			ackTracker)
+		// set the consumer so we know how to ack the message id
+		trackingMsgID.consumer = pc
 
-		if pc.messageShouldBeDiscarded(msgID) {
-			pc.AckID(msgID)
+		if pc.messageShouldBeDiscarded(trackingMsgID) {
+			pc.AckID(trackingMsgID)
 			continue
 		}
+
+		var msgID MessageID
+		if isChunkedMsg {
+			ctx := pc.chunkedMsgCtxMap.get(msgMeta.GetUuid())
+			if ctx == nil {
+				// chunkedMsgCtxMap has closed because of consumer closed

Review Comment:
   Need to add a log here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002951350


##########
pulsar/consumer_impl.go:
##########
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return errors.New("failed to convert trackingMessageID")
-	}
-
-	if mid.consumer != nil {
-		return mid.Ack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {

Review Comment:
   `trackingMessageID` does not records chunking infomation. 
   
   For example, `Ack()` a big message need to ack all the chunks of it. Using `trackingMessageID` can not figure out which chunk (messageId) need to be ack.
   
   `trackingMessageID` is designed to tracking batch messages so it shoud not be the messageId type accepted by the method exposed by `partitionConsumer`. I think the better way would be to accept `MessageID ` as the messageId type in partitionConsumer methods. However, only the necessary interfaces have been modified (`Ack`, `NAck` and `Seed`) for the least changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r1002955090


##########
pulsar/consumer_partition.go:
##########
@@ -698,8 +746,23 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		}
 	}
 
+	isChunkedMsg := false
+	if msgMeta.GetNumChunksFromMsg() > 1 {
+		isChunkedMsg = true
+	}
+
+	processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)

Review Comment:
   Good suggestion, Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r970254111


##########
pulsar/internal/timewheel.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"container/list"
+	"log"
+	"time"
+)
+
+// TimeWheel can execute job after waiting given duration
+type TimeWheel struct {

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973601872


##########
pulsar/producer_partition.go:
##########
@@ -745,28 +933,34 @@ func (p *partitionProducer) failTimeoutMessages() {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatches() {
-	batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches()
+	batchesData, sequenceIDs, callbacks, errs := p.batchBuilder.FlushBatches()

Review Comment:
   `Errors.Is()` is used to judge internal error. The original parameter name `errors` is conflict with package `errors`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #805: [Issue 456] Support chunking for big messages.

Posted by GitBox <gi...@apache.org>.
Gleiphir2769 commented on PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#issuecomment-1207174306

   > Based on Java implementation, ConsumerImpl.java, Chunking also requires changes at the consumer side to be able to assembly chunks into the original message. Are you going to add support on the consumer side?
   
   Hi, the consumer side is implemented here. Looking forward to your review. @zzzming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org