You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:56:55 UTC

[pulsar] 05/22: Fix the retry topic's `REAL_TOPIC` & `ORIGIN_MESSAGE_ID` property should not be modified once it has been written. (#12451)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 509a59d5a76df01e18d0382712ea7029fe01f5ae
Author: YANGLiiN <ie...@qq.com>
AuthorDate: Mon Oct 25 12:17:57 2021 +0800

    Fix the retry topic's `REAL_TOPIC` & `ORIGIN_MESSAGE_ID` property should not be modified once it has been written. (#12451)
    
    ### Motivation
    when reconsumer the message with the configuration maxRedeliveryCount > 1 ,eg deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).build()):
    
    then when consumer the same message by the third time (RECONSUMETIMES=2) the REAL_TOPIC and the ORIGIN_MESSAGE_ID changed as the second message.
    
    but, this should not changed once the REAL_TOPIC ORIGIN_MESSAGE_ID property was written .
    ```
    2021-10-21T15:03:12,771+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 4:0:-1:0 {}
    2021-10-21T15:03:12,909+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:1:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:1:-1:0, DELAY_TIME=1000, RECONSUMETIMES=1}
    2021-10-21T15:03:12,965+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:10:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic-my-subscription-RETRY, ORIGIN_MESSAGE_IDY_TIME=3:0:-1, DELAY_TIME=1000, RECONSUMETIMES=2}
    2021-10-21T15:03:13,026+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:20:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic-my-subscription-RETRY, ORIGIN_MESSAGE_IDY_TIME=3:10:-1, DELAY_TIME=1000, RECONSUMETIMES=3}
    ```
    Expected Results (REAL_TOPIC ORIGIN_MESSAGE_ID property equals the frist origin message)
    ```
    2021-10-21T15:27:01,390+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 4:0:-1:0 {}
    2021-10-21T15:27:01,479+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:0:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=1}
    2021-10-21T15:27:01,547+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:10:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=2}
    2021-10-21T15:27:01,603+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:20:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=3}
    ```
    ### Modifications
    https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L652-L653
    
    propertiesMap.put() --> propertiesMap.putIfAbsent()
    
    add one testcase to verify the REAL_TOPIC ORIGIN_MESSAGE_ID property
    
    (cherry picked from commit 85178830c9bf7462729f6f5d8d9285dee5d2d93d)
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 105 ++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 +-
 2 files changed, 102 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index fc84a62..48d20c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -18,18 +18,19 @@
  */
 package org.apache.pulsar.client.api;
 
-import lombok.Cleanup;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import com.google.common.collect.Sets;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertNull;
-
 @Test(groups = "broker-api")
 public class RetryTopicTest extends ProducerConsumerBase {
 
@@ -119,6 +120,100 @@ public class RetryTopicTest extends ProducerConsumerBase {
         checkConsumer.close();
     }
 
+    @Test
+    public void testRetryTopicProperties() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+
+        final int maxRedeliveryCount = 3;
+
+        final int sendMessages = 10;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        Set<String> originMessageIds = Sets.newHashSet();
+        for (int i = 0; i < sendMessages; i++) {
+            MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+            originMessageIds.add(msgId.toString());
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        Set<String> retryMessageIds = Sets.newHashSet();
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            // retry message
+            if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+                // check the REAL_TOPIC property
+                assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+                retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
+            }
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        // check the REAL_TOPIC property
+        assertEquals(retryMessageIds, originMessageIds);
+
+        int totalInDeadLetter = 0;
+        Set<String> deadLetterMessageIds = Sets.newHashSet();
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(),
+                    new String(message.getData()));
+            // dead letter message
+            if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+                // check the REAL_TOPIC property
+                assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+                deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
+            }
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        assertEquals(deadLetterMessageIds, originMessageIds);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(),
+                    new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
     //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
     @Test
     public void testRetryTopicNameForCompatibility () throws Exception {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 72167b2..d4a6e88 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -649,8 +649,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (message.getProperties() != null) {
             propertiesMap.putAll(message.getProperties());
         }
-        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
-        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+        propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
+        propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
         return propertiesMap;
     }