You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/12/09 07:14:30 UTC
[pulsar-client-go] branch master updated: fix: fix 879 (#902)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 055b00b fix: fix 879 (#902)
055b00b is described below
commit 055b00b83ccfef4ea12a593e3678cb4bdd18311c
Author: Jiaqi Shen <18...@163.com>
AuthorDate: Fri Dec 9 15:14:25 2022 +0800
fix: fix 879 (#902)
---
pulsar/consumer_test.go | 35 ++++++++++++++++++++++++++++++-----
pulsar/producer_partition.go | 6 ++++++
2 files changed, 36 insertions(+), 5 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 989b906..95462ba 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -804,12 +804,22 @@ func TestConsumerCompression(t *testing.T) {
topicName := newTopicName()
ctx := context.Background()
- producer, err := client.CreateProducer(ProducerOptions{
+ // enable batching
+ p1, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: LZ4,
})
assert.Nil(t, err)
- defer producer.Close()
+ defer p1.Close()
+
+ // disable batching
+ p2, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ CompressionType: LZ4,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer p2.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
@@ -821,8 +831,16 @@ func TestConsumerCompression(t *testing.T) {
const N = 100
for i := 0; i < N; i++ {
- if _, err := producer.Send(ctx, &ProducerMessage{
- Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ if _, err := p1.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < N; i++ {
+ if _, err := p2.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)),
}); err != nil {
t.Fatal(err)
}
@@ -831,7 +849,14 @@ func TestConsumerCompression(t *testing.T) {
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
- assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+ assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index fc564cb..a08ee04 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -574,6 +574,12 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize = len(compressedPayload)
checkSize = compressedSize
+
+ // set the compress type in msgMetaData
+ compressionType := pb.CompressionType(p.options.CompressionType)
+ if compressionType != pb.CompressionType_NONE {
+ mm.Compression = &compressionType
+ }
} else {
// final check for batching message is in serializeMessage
// this is a double check