You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/03 02:20:42 UTC
[pulsar-client-go] branch master updated: [issue #807] dlq topic producer options (#809)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 68e4317 [issue #807] dlq topic producer options (#809)
68e4317 is described below
commit 68e43175f2c4e8eb3bf181a02b03c5382a09f1aa
Author: ming <it...@gmail.com>
AuthorDate: Fri Sep 2 22:20:36 2022 -0400
[issue #807] dlq topic producer options (#809)
* dlq topic producer options
* user defined producer options
* add another unit test
* retry letter queue producer use the same dlq producer option
Fixes #807
### Motivation
To customize producer options for DLQ topics.
### Modifications
Add DLQProducerOptions to consumer's DLQ policy.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
The existing DLQ test is enhanced to cover customized producer options.
### Does this pull request potentially affect one of the following parts:
A new producer options field, `DLQProuducerOptions`, is introduced for DLQ policy to govern the producer options.
```
ConsumerOptions{
Topics: topics,
DLQ: &DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
DLQProducerOptions: ProducerOptions{
BatchingMaxPublishDelay: 100 * time.Millisecond,
},
},
```
- Dependencies (does it add or upgrade a dependency): no
- The public API: yes
- The schema: no
- The default values of configurations: yes (the existing lz4 compression type is the default.)
- The wire protocol: no
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (GoDocs)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
---
pulsar/consumer.go | 3 +++
pulsar/consumer_test.go | 28 ++++++++++++++++++++++++----
pulsar/dlq_router.go | 16 ++++++++++------
pulsar/retry_router.go | 14 +++++++++-----
4 files changed, 46 insertions(+), 15 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 2df1637..70d67ba 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -71,6 +71,9 @@ type DLQPolicy struct {
// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
DeadLetterTopic string
+ // ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic
+ ProducerOptions ProducerOptions
+
// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
RetryLetterTopic string
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 20d4290..91404ad 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1025,6 +1025,19 @@ func TestConsumerReceiveErrAfterClose(t *testing.T) {
}
func TestDLQ(t *testing.T) {
+ DLQWithProducerOptions(t, nil)
+}
+
+func TestDLQWithProducerOptions(t *testing.T) {
+ DLQWithProducerOptions(t,
+ &ProducerOptions{
+ BatchingMaxPublishDelay: 100 * time.Millisecond,
+ BatchingMaxSize: 64 * 1024,
+ CompressionType: ZLib,
+ })
+}
+
+func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -1045,15 +1058,19 @@ func TestDLQ(t *testing.T) {
ctx := context.Background()
// create consumer
+ dlqPolicy := DLQPolicy{
+ MaxDeliveries: 3,
+ DeadLetterTopic: dlqTopic,
+ }
+ if prodOpt != nil {
+ dlqPolicy.ProducerOptions = *prodOpt
+ }
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
- DLQ: &DLQPolicy{
- MaxDeliveries: 3,
- DeadLetterTopic: dlqTopic,
- },
+ DLQ: &dlqPolicy,
})
assert.Nil(t, err)
defer consumer.Close()
@@ -1156,6 +1173,9 @@ func TestDLQMultiTopics(t *testing.T) {
DLQ: &DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
+ ProducerOptions: ProducerOptions{
+ BatchingMaxPublishDelay: 100 * time.Millisecond,
+ },
},
})
assert.Nil(t, err)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index b28600f..966bff1 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -135,12 +135,16 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
// Retry to create producer indefinitely
backoff := &internal.Backoff{}
for {
- producer, err := r.client.CreateProducer(ProducerOptions{
- Topic: r.policy.DeadLetterTopic,
- CompressionType: LZ4,
- BatchingMaxPublishDelay: 100 * time.Millisecond,
- Schema: schema,
- })
+ opt := r.policy.ProducerOptions
+ opt.Topic = r.policy.DeadLetterTopic
+ opt.Schema = schema
+
+ // the origin code sets to LZ4 compression with no options
+ // so the new design allows compression type to be overwritten but still set lz4 by default
+ if r.policy.ProducerOptions.CompressionType == NoCompression {
+ opt.CompressionType = LZ4
+ }
+ producer, err := r.client.CreateProducer(opt)
if err != nil {
r.log.WithError(err).Error("Failed to create DLQ producer")
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 457d809..4d19ce2 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -125,11 +125,15 @@ func (r *retryRouter) getProducer() Producer {
// Retry to create producer indefinitely
backoff := &internal.Backoff{}
for {
- producer, err := r.client.CreateProducer(ProducerOptions{
- Topic: r.policy.RetryLetterTopic,
- CompressionType: LZ4,
- BatchingMaxPublishDelay: 100 * time.Millisecond,
- })
+ opt := r.policy.ProducerOptions
+ opt.Topic = r.policy.RetryLetterTopic
+ // the origin code sets to LZ4 compression with no options
+ // so the new design allows compression type to be overwritten but still set lz4 by default
+ if r.policy.ProducerOptions.CompressionType == NoCompression {
+ opt.CompressionType = LZ4
+ }
+
+ producer, err := r.client.CreateProducer(opt)
if err != nil {
r.log.WithError(err).Error("Failed to create RLQ producer")