You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/07/13 06:33:24 UTC

[pulsar] branch master updated: [fix][txn] Transaction cumulative ack redeliver change (#14371)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6396bbe344 [fix][txn] Transaction cumulative ack redeliver change (#14371)
e6396bbe344 is described below

commit e6396bbe344867f8821e3f5975d385cd4e5b5251
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 13 14:33:19 2022 +0800

    [fix][txn] Transaction cumulative ack redeliver change (#14371)
    
    https://github.com/apache/pulsar/pull/10478
    
    ### Motivation
    since #10478 merged, we should change the cumulative ack with transaction abort redeliver logic. We can't redeliver unCumulativeAck message by the server because the client will receive the new message and ack then will receive the old message they abort.
    
    in this case:
    1. we have 5 message
    2. cumulative ack 3 messages with the transaction
    3. we abort this transaction
    4. server redeliver message by the current consumer_epoch
    5. the client will not filter the 4 or 5 messages, because in #10478 we don't change the client consumer epoch
    6. client cumulative ack 4 5 with transaction and commit will lose the 1 2 3 messages and the consume message, not in order.
    ### Modifications
    don't redeliver any cumulative ack messages, it will do by user self
---
 .../pendingack/impl/PendingAckHandleImpl.java      |  6 +-
 .../client/impl/TransactionEndToEndTest.java       | 69 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 15 -----
 .../client/impl/transaction/TransactionImpl.java   | 25 +-------
 4 files changed, 74 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 1a159974700..c74fb3e9217 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
-import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -528,9 +527,10 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                         if (cumulativeAckOfTransaction.getKey().equals(txnId)) {
                             cumulativeAckOfTransaction = null;
                         }
-                        //TODO: pendingAck handle next pr will fix
-                        persistentSubscription.redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
                         abortFuture.complete(null);
+
+                        // in cumulative ack with transaction, don't depend on server redeliver message,
+                        // it will cause the messages to be out of order
                     }).exceptionally(e -> {
                         log.error("[{}] Transaction pending ack store abort txnId : [{}] fail!",
                                 topicName, txnId, e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3a40982fed4..b372f0b61c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import io.netty.channel.ChannelHandlerContext;
@@ -1066,6 +1067,74 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testCumulativeAckRedeliverMessages() throws Exception {
+        String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages";
+
+        int count = 5;
+        int transactionCumulativeAck = 3;
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        // send 5 messages
+        for (int i = 0; i < count; i++) {
+            producer.send((i + "").getBytes(UTF_8));
+        }
+
+        Transaction transaction = getTxn();
+        Transaction invalidTransaction = getTxn();
+
+        Message<byte[]> message = null;
+        for (int i = 0; i < transactionCumulativeAck; i++) {
+            message = consumer.receive();
+        }
+
+        // receive transaction in order
+        assertEquals(message.getValue(), (transactionCumulativeAck - 1 + "").getBytes(UTF_8));
+
+        // ack the last message
+        consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction).get();
+
+        // another ack will throw TransactionConflictException
+        try {
+            consumer.acknowledgeCumulativeAsync(message.getMessageId(), invalidTransaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+            // abort transaction then redeliver messages
+            transaction.abort().get();
+            // consumer redeliver messages
+            consumer.redeliverUnacknowledgedMessages();
+        }
+
+        // receive the rest of the message
+        for (int i = 0; i < count; i++) {
+            message = consumer.receive();
+        }
+
+        Transaction commitTransaction = getTxn();
+
+        // receive the first message
+        assertEquals(message.getValue(), (count - 1 + "").getBytes(UTF_8));
+        // ack the end of the message
+        consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTransaction).get();
+
+        commitTransaction.commit().get();
+
+        // then redeliver will not receive any message
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNull(message);
+    }
+
     @Test
     public void testSendTxnMessageTimeout() throws Exception {
         String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index aeb1b9ed7db..4a60dad8bfc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -626,14 +626,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                                                            TransactionImpl txn) {
         CompletableFuture<Void> ackFuture;
         if (txn != null && this instanceof ConsumerImpl) {
-
-            // it is okay that we register acked topic after sending the acknowledgements. because
-            // the transactional ack will not be visiable for consumers until the transaction is
-            // committed
-            if (ackType == AckType.Cumulative) {
-                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
-            }
-
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
             // register the ackFuture as part of the transaction
@@ -649,13 +641,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                                                            TransactionImpl txn) {
         CompletableFuture<Void> ackFuture;
         if (txn != null && (this instanceof ConsumerImpl)) {
-            // it is okay that we register acked topic after sending the acknowledgements. because
-            // the transactional ack will not be visiable for consumers until the transaction is
-            // committed
-            if (ackType == AckType.Cumulative) {
-                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
-            }
-
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageId, ackType, properties, txn));
             // register the ackFuture as part of the transaction
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index aa7a18047de..55b20438693 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -37,7 +36,6 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 
@@ -62,7 +60,6 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final Map<String, CompletableFuture<Void>> registerPartitionMap;
     private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
-    private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
 
     private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
     private final ArrayList<CompletableFuture<Void>> ackFutureList;
@@ -122,9 +119,8 @@ public class TransactionImpl implements Transaction , TimerTask {
                     }
                 });
             }
-        } else {
-            return completableFuture;
         }
+        return completableFuture;
     }
 
     public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
@@ -147,22 +143,14 @@ public class TransactionImpl implements Transaction , TimerTask {
                     }
                 });
             }
-        } else {
-            return completableFuture;
         }
+        return completableFuture;
     }
 
     public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
         ackFutureList.add(ackFuture);
     }
 
-    public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer) {
-        if (this.cumulativeAckConsumers == null) {
-            this.cumulativeAckConsumers = new HashMap<>();
-        }
-        cumulativeAckConsumers.put(consumer, 0);
-    }
-
     @Override
     public CompletableFuture<Void> commit() {
         timeout.cancel();
@@ -202,16 +190,7 @@ public class TransactionImpl implements Transaction , TimerTask {
                 if (e != null) {
                     log.error(e.getMessage());
                 }
-                if (cumulativeAckConsumers != null) {
-                    cumulativeAckConsumers.forEach((consumer, integer) ->
-                            cumulativeAckConsumers
-                                    .putIfAbsent(consumer, consumer.clearIncomingMessagesAndGetMessageNumber()));
-                }
                 tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {
-                    if (cumulativeAckConsumers != null) {
-                        cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits);
-                        cumulativeAckConsumers.clear();
-                    }
 
                     if (ex != null) {
                         if (ex instanceof TransactionNotFoundException