You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:12:31 UTC

[pulsar] branch branch-2.10 updated: [fix][txn]Fix transasction ack batch message (#15875)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 2cd29dce5a0 [fix][txn]Fix transasction ack batch message (#15875)
2cd29dce5a0 is described below

commit 2cd29dce5a0eab8520509b2f38d31f09f85b3c84
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Jun 7 12:45:00 2022 +0800

    [fix][txn]Fix transasction ack batch message (#15875)
    
    Fixes https://github.com/apache/pulsar/issues/15832
    
    ### Motivation
    The transaction needs batch size to help determine whether the batch message is in the pending ack state.
    
    ### Modifications
    Returns the batch size of messageID directly.
    
    (cherry picked from commit f87b3708ae6f05a8d4d4d6cd0db1090724fbcf4b)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  7 ++-
 .../pendingack/PendingAckPersistentTest.java       | 71 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index e7651706127..3f50ab7a7aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -496,8 +496,11 @@ public class Consumer {
                 position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
                 ackedCount = batchSize;
             }
-
-            positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+            if (msgId.hasBatchSize()) {
+                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+            } else {
+                positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+            }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 367c63797d2..35474505d1a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -46,8 +46,10 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -575,4 +577,73 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
 
     }
+
+    @Test
+    public void testTransactionConflictExceptionWhenAckBatchMessage() throws Exception {
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "test").toString();
+
+        String subscriptionName = "my-subscription-batch";
+        pulsarServiceList.get(0).getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(topic)).get()
+                .setDeletionAtBatchIndexLevelEnabled(true);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                // set batch max publish delay big enough to make sure entry has 3 messages
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .topic(topic).create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscriptionName)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .isAckReceiptEnabled(true)
+                .topic(topic)
+                .subscribe();
+
+        List<MessageId> messageIds = new ArrayList<>();
+        List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            String message = "my-message-" + i;
+            messages.add(message);
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+            futureMessageIds.add(messageIdCompletableFuture);
+        }
+
+        for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+            MessageId messageId = futureMessageId.get();
+            messageIds.add(messageId);
+        }
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.DAYS)
+                .build()
+                .get();
+
+        Message<String> message1 = consumer.receive();
+        Message<String> message2 = consumer.receive();
+
+        BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId();
+        consumer.acknowledgeAsync(messageId, transaction).get();
+
+        Transaction transaction2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.DAYS)
+                .build()
+                .get();
+        transaction.commit().get();
+
+        try {
+            consumer.acknowledgeAsync(messageId, transaction2).get();
+            fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+        }
+    }
+
 }