You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/01 06:44:49 UTC
[pulsar] branch branch-2.11 updated: [fix][txn] fix ack with txn compute ackedCount error (#17016)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new d9cd5f4d18f [fix][txn] fix ack with txn compute ackedCount error (#17016)
d9cd5f4d18f is described below
commit d9cd5f4d18fcae429626fc726b54bc19dd07d455
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 1 12:29:04 2022 +0800
[fix][txn] fix ack with txn compute ackedCount error (#17016)
Co-authored-by: congbobo184 <co...@github.com>
---
.../org/apache/pulsar/broker/service/Consumer.java | 25 ++++++------
.../client/impl/TransactionEndToEndTest.java | 46 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 85df9ff107c..8dfed3e36e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -531,25 +531,28 @@ public class Consumer {
LongAdder totalAckCount = new LongAdder();
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
- PositionImpl position;
+ PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+ // acked count at least one
long ackedCount = 0;
- long batchSize = getBatchSize(msgId);
+ long batchSize = 0;
+ if (msgId.hasBatchSize()) {
+ batchSize = msgId.getBatchSize();
+ // ack batch messages set ackeCount = batchSize
+ ackedCount = msgId.getBatchSize();
+ positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+ } else {
+ // ack no batch message set ackedCount = 1
+ ackedCount = 1;
+ positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+ }
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
ackSets[j] = msgId.getAckSetAt(j);
}
- position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
+ position.setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
- } else {
- position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
- ackedCount = batchSize;
- }
- if (msgId.hasBatchSize()) {
- positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
- } else {
- positionsAcked.add(new MutablePair<>(position, (int) batchSize));
}
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 62406410b0b..37d9eb6967d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1172,6 +1172,52 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
}
+ @Test
+ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
+ final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks";
+ final String subName = "test";
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subName)
+ .subscribe();
+
+ // send 5 messages with one batch
+ for (int i = 0; i < 5; i++) {
+ producer.sendAsync((i + "").getBytes(UTF_8));
+ }
+
+ List<MessageId> messageIds = new ArrayList<>();
+
+ // receive the batch messages add to a list
+ for (int i = 0; i < 5; i++) {
+ messageIds.add(consumer.receive().getMessageId());
+ }
+
+ MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
+
+
+ // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck
+ getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+ .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
+ .remove(messageId.ledgerId, messageId.entryId);
+
+ Transaction txn = getTxn();
+ consumer.acknowledgeAsync(messageIds.get(1), txn).get();
+
+ // ack one message, the unack count is 4
+ assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+ .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
+ }
+
@Test
public void testSendTxnAckMessageToDLQ() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";