You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/25 05:48:53 UTC

[pulsar] 03/14: Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0eebaf1d00ea4a0cddd8464c3be26ef531387fa
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Fri Mar 18 12:05:26 2022 +0800

    Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    Fixes #14704
    
    ### Motivation
    
    When the user uses the function of DeadLetterPolicy, it is better than misoperation. MaxRedeliverCount may be set to 0. When it is set to 0, according to the current processing logic of the Java Client, the message will be pushed to the DeadLetter Topic every time.
    
    ### Modifications
    
    - When MaxRedeliverCount <= 0 in DeadLetterPolicy, we reset MaxRedeliverCount to default value
    
    (cherry picked from commit 601fbdd40eabfcd5d5519e0c5bcc20ae280a8e18)
---
 .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java     |  2 ++
 .../org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java | 10 ++++++++++
 .../apache/pulsar/websocket/AbstractWebSocketHandlerTest.java  |  3 ++-
 3 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 471d4ba..dea946b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -428,6 +428,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             if (conf.getAckTimeoutMillis() == 0) {
                 conf.setAckTimeoutMillis(DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER);
             }
+
+            checkArgument(deadLetterPolicy.getMaxRedeliverCount() > 0, "MaxRedeliverCount must be > 0.");
             conf.setDeadLetterPolicy(deadLetterPolicy);
         }
         return this;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 13d63ba..36ea53f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -288,6 +289,15 @@ public class ConsumerBuilderImplTest {
                 .build());
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testRedeliverCountOfDeadLetterPolicy() {
+        consumerBuilderImpl.deadLetterPolicy(DeadLetterPolicy.builder()
+                .maxRedeliverCount(0)
+                .deadLetterTopic("test-dead-letter-topic")
+                .retryLetterTopic("test-retry-letter-topic")
+                .build());
+    }
+
     @Test
     public void testConsumerBuilderImplWhenNumericPropertiesAreValid() {
         consumerBuilderImpl.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS);
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 9bd9907..782e05e 100644
--- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -369,11 +369,12 @@ public class AbstractWebSocketHandlerTest {
         consumerHandler.clearQueryParams();
         consumerHandler.putQueryParam("receiverQueueSize", "1001");
         consumerHandler.putQueryParam("deadLetterTopic", "dead-letter-topic");
+        consumerHandler.putQueryParam("maxRedeliverCount", "3");
 
         conf = consumerHandler.getConf();
         // receive queue size is the minimum value of default value (1000) and user defined value(1001)
         assertEquals(conf.getReceiverQueueSize(), 1000);
         assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), "dead-letter-topic");
-        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 0);
+        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
     }
 }