You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/03 02:20:42 UTC

[pulsar-client-go] branch master updated: [issue #807] dlq topic producer options (#809)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 68e4317  [issue #807] dlq topic producer options (#809)
68e4317 is described below

commit 68e43175f2c4e8eb3bf181a02b03c5382a09f1aa
Author: ming <it...@gmail.com>
AuthorDate: Fri Sep 2 22:20:36 2022 -0400

    [issue #807] dlq topic producer options (#809)
    
    * dlq topic producer options
    
    * user defined producer options
    
    * add another unit test
    
    * retry letter queue producer use the same dlq producer option
    
    Fixes #807
    
    ### Motivation
    
    To customize producer options for DLQ topics.
    
    ### Modifications
    
    Add DLQProducerOptions to consumer's DLQ policy.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    *(Please pick either of the following options)*
    
    The existing DLQ test is enhanced to cover customized producer options.
    
    ### Does this pull request potentially affect one of the following parts:
    A new producer options field, `DLQProuducerOptions`, is introduced for DLQ policy to govern the producer options.
    ```
    ConsumerOptions{
                    Topics:              topics,
                    DLQ: &DLQPolicy{
                            MaxDeliveries:   3,
                            DeadLetterTopic: dlqTopic,
                            DLQProducerOptions: ProducerOptions{
                                    BatchingMaxPublishDelay: 100 * time.Millisecond,
                            },
                    },
    ```
      - Dependencies (does it add or upgrade a dependency): no
      - The public API: yes
      - The schema: no
      - The default values of configurations: yes (the existing lz4 compression type is the default.)
      - The wire protocol: no
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (GoDocs)
      - If a feature is not applicable for documentation, explain why?
      - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
---
 pulsar/consumer.go      |  3 +++
 pulsar/consumer_test.go | 28 ++++++++++++++++++++++++----
 pulsar/dlq_router.go    | 16 ++++++++++------
 pulsar/retry_router.go  | 14 +++++++++-----
 4 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 2df1637..70d67ba 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -71,6 +71,9 @@ type DLQPolicy struct {
 	// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
 	DeadLetterTopic string
 
+	// ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic
+	ProducerOptions ProducerOptions
+
 	// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
 	RetryLetterTopic string
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 20d4290..91404ad 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1025,6 +1025,19 @@ func TestConsumerReceiveErrAfterClose(t *testing.T) {
 }
 
 func TestDLQ(t *testing.T) {
+	DLQWithProducerOptions(t, nil)
+}
+
+func TestDLQWithProducerOptions(t *testing.T) {
+	DLQWithProducerOptions(t,
+		&ProducerOptions{
+			BatchingMaxPublishDelay: 100 * time.Millisecond,
+			BatchingMaxSize:         64 * 1024,
+			CompressionType:         ZLib,
+		})
+}
+
+func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
@@ -1045,15 +1058,19 @@ func TestDLQ(t *testing.T) {
 	ctx := context.Background()
 
 	// create consumer
+	dlqPolicy := DLQPolicy{
+		MaxDeliveries:   3,
+		DeadLetterTopic: dlqTopic,
+	}
+	if prodOpt != nil {
+		dlqPolicy.ProducerOptions = *prodOpt
+	}
 	consumer, err := client.Subscribe(ConsumerOptions{
 		Topic:               topic,
 		SubscriptionName:    "my-sub",
 		NackRedeliveryDelay: 1 * time.Second,
 		Type:                Shared,
-		DLQ: &DLQPolicy{
-			MaxDeliveries:   3,
-			DeadLetterTopic: dlqTopic,
-		},
+		DLQ:                 &dlqPolicy,
 	})
 	assert.Nil(t, err)
 	defer consumer.Close()
@@ -1156,6 +1173,9 @@ func TestDLQMultiTopics(t *testing.T) {
 		DLQ: &DLQPolicy{
 			MaxDeliveries:   3,
 			DeadLetterTopic: dlqTopic,
+			ProducerOptions: ProducerOptions{
+				BatchingMaxPublishDelay: 100 * time.Millisecond,
+			},
 		},
 	})
 	assert.Nil(t, err)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index b28600f..966bff1 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -135,12 +135,16 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
 	// Retry to create producer indefinitely
 	backoff := &internal.Backoff{}
 	for {
-		producer, err := r.client.CreateProducer(ProducerOptions{
-			Topic:                   r.policy.DeadLetterTopic,
-			CompressionType:         LZ4,
-			BatchingMaxPublishDelay: 100 * time.Millisecond,
-			Schema:                  schema,
-		})
+		opt := r.policy.ProducerOptions
+		opt.Topic = r.policy.DeadLetterTopic
+		opt.Schema = schema
+
+		// the origin code sets to LZ4 compression with no options
+		// so the new design allows compression type to be overwritten but still set lz4 by default
+		if r.policy.ProducerOptions.CompressionType == NoCompression {
+			opt.CompressionType = LZ4
+		}
+		producer, err := r.client.CreateProducer(opt)
 
 		if err != nil {
 			r.log.WithError(err).Error("Failed to create DLQ producer")
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 457d809..4d19ce2 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -125,11 +125,15 @@ func (r *retryRouter) getProducer() Producer {
 	// Retry to create producer indefinitely
 	backoff := &internal.Backoff{}
 	for {
-		producer, err := r.client.CreateProducer(ProducerOptions{
-			Topic:                   r.policy.RetryLetterTopic,
-			CompressionType:         LZ4,
-			BatchingMaxPublishDelay: 100 * time.Millisecond,
-		})
+		opt := r.policy.ProducerOptions
+		opt.Topic = r.policy.RetryLetterTopic
+		// the origin code sets to LZ4 compression with no options
+		// so the new design allows compression type to be overwritten but still set lz4 by default
+		if r.policy.ProducerOptions.CompressionType == NoCompression {
+			opt.CompressionType = LZ4
+		}
+
+		producer, err := r.client.CreateProducer(opt)
 
 		if err != nil {
 			r.log.WithError(err).Error("Failed to create RLQ producer")