You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/29 22:59:08 UTC
[pulsar] branch branch-2.11 updated: [fix][client] Fix the message present in incoming queue after go to DLQ (#17326)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new d053c76db80 [fix][client] Fix the message present in incoming queue after go to DLQ (#17326)
d053c76db80 is described below
commit d053c76db80b5c19b259c0827fcac527078838c4
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Aug 30 06:57:09 2022 +0800
[fix][client] Fix the message present in incoming queue after go to DLQ (#17326)
(cherry picked from commit 2c1d2130e65a075f5a0bb261f5bd617e88d2e945)
---
.../org/apache/pulsar/client/impl/TransactionEndToEndTest.java | 4 ++++
.../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 10 +++++++++-
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 1b114922c71..62406410b0b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1223,6 +1223,8 @@ public class TransactionEndToEndTest extends TransactionTestBase {
// the message will be sent to DLQ, can't receive
assertNull(consumer.receive(3, TimeUnit.SECONDS));
+ assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
+
assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
}
@@ -1287,6 +1289,8 @@ public class TransactionEndToEndTest extends TransactionTestBase {
// the message will be sent to DLQ, can't receive
assertNull(consumer.receive(3, TimeUnit.SECONDS));
+ assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);
+
assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d8f85bcfbaa..5e84a30e867 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1392,6 +1392,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
Collections.singletonList(message));
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
+ // The message is skipped due to reaching the max redelivery count,
+ // so we need to increase the available permits
+ increaseAvailablePermits(cnx);
return;
}
}
@@ -1559,6 +1562,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
+ // Skip the message which reaches the max redelivery count.
+ if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
+ skippedMessages++;
+ continue;
+ }
+
}
executeNotifyCallback(message);
}
@@ -1576,7 +1585,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
possibleToDeadLetter);
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
- return;
}
}
}