You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 12:54:02 UTC
[pulsar] 02/04: [fix][txn] fix ack with txn compute ackedCount error (#17016)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a0eb84ef1947d0fbb0594b393558162529c47828
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 1 12:29:04 2022 +0800
[fix][txn] fix ack with txn compute ackedCount error (#17016)
Co-authored-by: congbobo184 <co...@github.com>
(cherry picked from commit 176b0d6e9e0d647c611cfdd359e5088ccb58788c)
---
.../org/apache/pulsar/broker/service/Consumer.java | 25 ++++++------
.../client/impl/TransactionEndToEndTest.java | 46 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 14faddf9d63..af7bfed4e7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -496,25 +496,28 @@ public class Consumer {
LongAdder totalAckCount = new LongAdder();
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
- PositionImpl position;
+ PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+ // acked count at least one
long ackedCount = 0;
- long batchSize = getBatchSize(msgId);
+ long batchSize = 0;
+ if (msgId.hasBatchSize()) {
+ batchSize = msgId.getBatchSize();
+ // ack batch messages set ackeCount = batchSize
+ ackedCount = msgId.getBatchSize();
+ positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+ } else {
+ // ack no batch message set ackedCount = 1
+ ackedCount = 1;
+ positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+ }
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
ackSets[j] = msgId.getAckSetAt(j);
}
- position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
+ position.setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
- } else {
- position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
- ackedCount = batchSize;
- }
- if (msgId.hasBatchSize()) {
- positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
- } else {
- positionsAcked.add(new MutablePair<>(position, (int) batchSize));
}
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 273013d6f51..7ff26ddd309 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1108,6 +1108,52 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
}
+ @Test
+ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
+ final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks";
+ final String subName = "test";
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subName)
+ .subscribe();
+
+ // send 5 messages with one batch
+ for (int i = 0; i < 5; i++) {
+ producer.sendAsync((i + "").getBytes(UTF_8));
+ }
+
+ List<MessageId> messageIds = new ArrayList<>();
+
+ // receive the batch messages add to a list
+ for (int i = 0; i < 5; i++) {
+ messageIds.add(consumer.receive().getMessageId());
+ }
+
+ MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
+
+
+ // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck
+ getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+ .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
+ .remove(messageId.ledgerId, messageId.entryId);
+
+ Transaction txn = getTxn();
+ consumer.acknowledgeAsync(messageIds.get(1), txn).get();
+
+ // ack one message, the unack count is 4
+ assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+ .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
+ }
+
@Test
public void testSendTxnAckMessageToDLQ() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";