You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/08/28 12:18:55 UTC
[pulsar] 01/02: [fix][client] Fix reach redeliverCount client can't send messages to DLQ (#17287)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eb77b522ba0931ed22b32be5999ea68ad6337f2f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Aug 27 12:38:08 2022 +0800
[fix][client] Fix reach redeliverCount client can't send messages to DLQ (#17287)
(cherry picked from commit 4a28c087fe1308ea4eabc104b3d4889b47316afe)
---
.../client/impl/TransactionEndToEndTest.java | 55 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 13 +++--
2 files changed, 64 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 72eef04bad9..e2683afa2df 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -67,6 +68,7 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -1169,4 +1171,57 @@ public class TransactionEndToEndTest extends TransactionTestBase {
assertTrue(ex instanceof PulsarClientException.TimeoutException);
}
}
+
+ @Test
+ public void testSendTxnAckMessageToDLQ() throws Exception {
+ String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";
+ String subName = "test";
+ String value = "test";
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ // consumer can't receive the same message three times
+ .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
+ .topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+ topic, subName))
+ .subscriptionType(SubscriptionType.Shared)
+ .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+ .subscriptionName("test")
+ .subscribe();
+
+ producer.send(value.getBytes());
+ Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
+ .build().get();
+
+ // consumer receive the message the first time, redeliverCount = 0
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+ transaction.abort().get();
+
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build().get();
+
+ // consumer receive the message the second time, redeliverCount = 1, also can be received
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+ transaction.abort().get();
+
+ // consumer receive the message the third time, redeliverCount = 2,
+ // the message will be sent to DLQ, can't receive
+ assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+ assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4cb334b38e4..809e7c674b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1386,10 +1386,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
schema, redeliveryCount, consumerEpoch);
uncompressedPayload.release();
- if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null
- && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
- possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
- Collections.singletonList(message));
+ if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
+ if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+ Collections.singletonList(message));
+ if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
+ redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
+ return;
+ }
+ }
}
executeNotifyCallback(message);
} else {