You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:56:55 UTC
[pulsar] 05/22: Fix the retry topic's `REAL_TOPIC` & `ORIGIN_MESSAGE_ID` property should not be modified once it has been written. (#12451)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 509a59d5a76df01e18d0382712ea7029fe01f5ae
Author: YANGLiiN <ie...@qq.com>
AuthorDate: Mon Oct 25 12:17:57 2021 +0800
Fix the retry topic's `REAL_TOPIC` & `ORIGIN_MESSAGE_ID` property should not be modified once it has been written. (#12451)
### Motivation
when reconsumer the message with the configuration maxRedeliveryCount > 1 ,eg deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).build()):
then when consumer the same message by the third time (RECONSUMETIMES=2) the REAL_TOPIC and the ORIGIN_MESSAGE_ID changed as the second message.
but, this should not changed once the REAL_TOPIC ORIGIN_MESSAGE_ID property was written .
```
2021-10-21T15:03:12,771+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 4:0:-1:0 {}
2021-10-21T15:03:12,909+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:1:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:1:-1:0, DELAY_TIME=1000, RECONSUMETIMES=1}
2021-10-21T15:03:12,965+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:10:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic-my-subscription-RETRY, ORIGIN_MESSAGE_IDY_TIME=3:0:-1, DELAY_TIME=1000, RECONSUMETIMES=2}
2021-10-21T15:03:13,026+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:20:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic-my-subscription-RETRY, ORIGIN_MESSAGE_IDY_TIME=3:10:-1, DELAY_TIME=1000, RECONSUMETIMES=3}
```
Expected Results (REAL_TOPIC ORIGIN_MESSAGE_ID property equals the frist origin message)
```
2021-10-21T15:27:01,390+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 4:0:-1:0 {}
2021-10-21T15:27:01,479+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:0:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=1}
2021-10-21T15:27:01,547+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:10:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=2}
2021-10-21T15:27:01,603+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:20:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=3}
```
### Modifications
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L652-L653
propertiesMap.put() --> propertiesMap.putIfAbsent()
add one testcase to verify the REAL_TOPIC ORIGIN_MESSAGE_ID property
(cherry picked from commit 85178830c9bf7462729f6f5d8d9285dee5d2d93d)
---
.../apache/pulsar/client/api/RetryTopicTest.java | 105 ++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +-
2 files changed, 102 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index fc84a62..48d20c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -18,18 +18,19 @@
*/
package org.apache.pulsar.client.api;
-import lombok.Cleanup;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import com.google.common.collect.Sets;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.util.RetryMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertNull;
-
@Test(groups = "broker-api")
public class RetryTopicTest extends ProducerConsumerBase {
@@ -119,6 +120,100 @@ public class RetryTopicTest extends ProducerConsumerBase {
checkConsumer.close();
}
+ @Test
+ public void testRetryTopicProperties() throws Exception {
+ final String topic = "persistent://my-property/my-ns/retry-topic";
+
+ final int maxRedeliveryCount = 3;
+
+ final int sendMessages = 10;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+ .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+ .topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ Set<String> originMessageIds = Sets.newHashSet();
+ for (int i = 0; i < sendMessages; i++) {
+ MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+ originMessageIds.add(msgId.toString());
+ }
+
+ producer.close();
+
+ int totalReceived = 0;
+ Set<String> retryMessageIds = Sets.newHashSet();
+ do {
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+ // retry message
+ if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+ // check the REAL_TOPIC property
+ assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+ retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
+ }
+ consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ // check the REAL_TOPIC property
+ assertEquals(retryMessageIds, originMessageIds);
+
+ int totalInDeadLetter = 0;
+ Set<String> deadLetterMessageIds = Sets.newHashSet();
+ do {
+ Message message = deadLetterConsumer.receive();
+ log.info("dead letter consumer received message : {} {}", message.getMessageId(),
+ new String(message.getData()));
+ // dead letter message
+ if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+ // check the REAL_TOPIC property
+ assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
+ deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
+ }
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+
+ assertEquals(deadLetterMessageIds, originMessageIds);
+
+ deadLetterConsumer.close();
+ consumer.close();
+
+ Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+ if (checkMessage != null) {
+ log.info("check consumer received message : {} {}", checkMessage.getMessageId(),
+ new String(checkMessage.getData()));
+ }
+ assertNull(checkMessage);
+
+ checkConsumer.close();
+ }
+
//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
@Test
public void testRetryTopicNameForCompatibility () throws Exception {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 72167b2..d4a6e88 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -649,8 +649,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (message.getProperties() != null) {
propertiesMap.putAll(message.getProperties());
}
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
- propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
+ propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
+ propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
return propertiesMap;
}