You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/04 12:33:15 UTC

[pulsar] 06/15: Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d73942b6ae67c1be6a3501d0d221e762df00bcd4
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Fri Mar 18 12:05:26 2022 +0800

    Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    Fixes #14704
    
    When the user uses the function of DeadLetterPolicy, it is better than misoperation. MaxRedeliverCount may be set to 0. When it is set to 0, according to the current processing logic of the Java Client, the message will be pushed to the DeadLetter Topic every time.
    
    - When MaxRedeliverCount <= 0 in DeadLetterPolicy, we reset MaxRedeliverCount to default value
    
    (cherry picked from commit 601fbdd40eabfcd5d5519e0c5bcc20ae280a8e18)
---
 .../apache/pulsar/client/impl/ConsumerBuilderImpl.java  |  2 ++
 .../pulsar/client/impl/ConsumerBuilderImplTest.java     | 17 ++++++++++++++++-
 .../pulsar/websocket/AbstractWebSocketHandlerTest.java  |  3 ++-
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f902d765873..d01a2076d8d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -426,6 +426,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             if (conf.getAckTimeoutMillis() == 0) {
                 conf.setAckTimeoutMillis(DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER);
             }
+
+            checkArgument(deadLetterPolicy.getMaxRedeliverCount() > 0, "MaxRedeliverCount must be > 0.");
             conf.setDeadLetterPolicy(deadLetterPolicy);
         }
         return this;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index b2ce170b124..36ea53f31ef 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -18,7 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
@@ -283,6 +289,15 @@ public class ConsumerBuilderImplTest {
                 .build());
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testRedeliverCountOfDeadLetterPolicy() {
+        consumerBuilderImpl.deadLetterPolicy(DeadLetterPolicy.builder()
+                .maxRedeliverCount(0)
+                .deadLetterTopic("test-dead-letter-topic")
+                .retryLetterTopic("test-retry-letter-topic")
+                .build());
+    }
+
     @Test
     public void testConsumerBuilderImplWhenNumericPropertiesAreValid() {
         consumerBuilderImpl.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS);
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 9bd99076779..782e05ea625 100644
--- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -369,11 +369,12 @@ public class AbstractWebSocketHandlerTest {
         consumerHandler.clearQueryParams();
         consumerHandler.putQueryParam("receiverQueueSize", "1001");
         consumerHandler.putQueryParam("deadLetterTopic", "dead-letter-topic");
+        consumerHandler.putQueryParam("maxRedeliverCount", "3");
 
         conf = consumerHandler.getConf();
         // receive queue size is the minimum value of default value (1000) and user defined value(1001)
         assertEquals(conf.getReceiverQueueSize(), 1000);
         assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), "dead-letter-topic");
-        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 0);
+        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
     }
 }