You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:12:31 UTC
[pulsar] branch branch-2.10 updated: [fix][txn]Fix transasction ack batch message (#15875)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 2cd29dce5a0 [fix][txn]Fix transasction ack batch message (#15875)
2cd29dce5a0 is described below
commit 2cd29dce5a0eab8520509b2f38d31f09f85b3c84
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Jun 7 12:45:00 2022 +0800
[fix][txn]Fix transasction ack batch message (#15875)
Fixes https://github.com/apache/pulsar/issues/15832
### Motivation
The transaction needs batch size to help determine whether the batch message is in the pending ack state.
### Modifications
Returns the batch size of messageID directly.
(cherry picked from commit f87b3708ae6f05a8d4d4d6cd0db1090724fbcf4b)
---
.../org/apache/pulsar/broker/service/Consumer.java | 7 ++-
.../pendingack/PendingAckPersistentTest.java | 71 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index e7651706127..3f50ab7a7aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -496,8 +496,11 @@ public class Consumer {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = batchSize;
}
-
- positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+ if (msgId.hasBatchSize()) {
+ positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+ } else {
+ positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+ }
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 367c63797d2..35474505d1a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -46,8 +46,10 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -575,4 +577,73 @@ public class PendingAckPersistentTest extends TransactionTestBase {
assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
}
+
+ @Test
+ public void testTransactionConflictExceptionWhenAckBatchMessage() throws Exception {
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "test").toString();
+
+ String subscriptionName = "my-subscription-batch";
+ pulsarServiceList.get(0).getBrokerService()
+ .getManagedLedgerConfig(TopicName.get(topic)).get()
+ .setDeletionAtBatchIndexLevelEnabled(true);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ // set batch max publish delay big enough to make sure entry has 3 messages
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+ .topic(topic).create();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionName(subscriptionName)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .isAckReceiptEnabled(true)
+ .topic(topic)
+ .subscribe();
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+ MessageId messageId = futureMessageId.get();
+ messageIds.add(messageId);
+ }
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.DAYS)
+ .build()
+ .get();
+
+ Message<String> message1 = consumer.receive();
+ Message<String> message2 = consumer.receive();
+
+ BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId();
+ consumer.acknowledgeAsync(messageId, transaction).get();
+
+ Transaction transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.DAYS)
+ .build()
+ .get();
+ transaction.commit().get();
+
+ try {
+ consumer.acknowledgeAsync(messageId, transaction2).get();
+ fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+ }
+ }
+
}