You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/25 07:34:00 UTC

[pulsar] branch branch-2.10 updated: [feature][txn] Fix individual ack batch message with transaction abort redevlier duplicate messages (#14327)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 90b4f86922f [feature][txn] Fix individual ack batch message with transaction abort redevlier duplicate messages (#14327)
90b4f86922f is described below

commit 90b4f86922fff37b74e9dc572378cb8ba49f16af
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Feb 17 09:18:42 2023 +0800

    [feature][txn] Fix individual ack batch message with transaction abort redevlier duplicate messages (#14327)
    
    If individual ack batch message with transaction and abort this transaction, we will redeliver this message. but this batch message some bit sit are acked by another transaction and re consume this bit sit will produce `TransactionConflictException`, we don't need to redeliver this bit sit witch is acked by another transaction.
    
    if batch have batch size 5
    
    1. txn1 ack 0, 1     the ackSet is   00111
    2. txn2 ack 2 3 4 the ack Set is  11000
    3. abort txn2 redeliver this position is 00111
    4. but now we don't filter txn1 ackSet so redeliver this position bitSet is 111111
    When filter the message we should filter the bit sit witch is real ack or in pendingAck state
    add the test
    
    (cherry picked from commit e0c0d5e8785ae8933af1bcbb4ddea59f35644c05)
---
 .../mledger/util/PositionAckSetUtil.java           | 12 +++--
 .../broker/service/AbstractBaseDispatcher.java     | 25 +++++++++-
 .../service/persistent/PersistentSubscription.java |  9 +++-
 .../transaction/pendingack/PendingAckHandle.java   | 11 ++++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  5 ++
 .../pendingack/impl/PendingAckHandleImpl.java      | 11 ++++
 .../client/impl/TransactionEndToEndTest.java       | 58 +++++++++++++++++++++-
 7 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
index 47d4bc2eea0..da3043e7458 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
@@ -46,12 +46,18 @@ public class PositionAckSetUtil {
         if (currentPosition == null || otherPosition == null) {
             return;
         }
-        BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(currentPosition.getAckSet());
-        BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(otherPosition.getAckSet());
+        currentPosition.setAckSet(andAckSet(currentPosition.getAckSet(), otherPosition.getAckSet()));
+    }
+
+    //This method is do `and` operation for ack set
+    public static long[] andAckSet(long[] firstAckSet, long[] secondAckSet) {
+        BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(firstAckSet);
+        BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(secondAckSet);
         thisAckSet.and(otherAckSet);
-        currentPosition.setAckSet(thisAckSet.toLongArray());
+        long[] ackSet = thisAckSet.toLongArray();
         thisAckSet.recycle();
         otherAckSet.recycle();
+        return ackSet;
     }
 
     //This method is compare two position which position is bigger than another one.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index c9c0300da05..da6be55f8e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.service;
 
+import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -39,10 +40,12 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.service.plugin.FilterContext;
+import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -212,8 +215,28 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             batchSizes.setBatchSize(i, batchSize);
             long[] ackSet = null;
             if (indexesAcks != null && cursor != null) {
+                PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                 ackSet = cursor
-                        .getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                        .getDeletedBatchIndexesAsLongArray(position);
+                // some batch messages ack bit sit will be in pendingAck state, so don't send all bit sit to consumer
+                if (subscription instanceof PersistentSubscription
+                        && ((PersistentSubscription) subscription)
+                        .getPendingAckHandle() instanceof PendingAckHandleImpl) {
+                    PositionImpl positionInPendingAck =
+                            ((PersistentSubscription) subscription).getPositionInPendingAck(position);
+                    // if this position not in pendingAck state, don't need to do any op
+                    if (positionInPendingAck != null) {
+                        if (positionInPendingAck.hasAckSet()) {
+                            // need to or ackSet in pendingAck state and cursor ackSet which bit sit has been acked
+                            if (ackSet != null) {
+                                ackSet = andAckSet(ackSet, positionInPendingAck.getAckSet());
+                            } else {
+                                // if actSet is null, use pendingAck ackSet
+                                ackSet = positionInPendingAck.getAckSet();
+                            }
+                        }
+                    }
+                }
                 if (ackSet != null) {
                     indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
                 } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c2999c1c741..ff1c654f1d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1091,6 +1091,9 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
         return subscriptionProperties;
     }
 
+    public PositionImpl getPositionInPendingAck(PositionImpl position) {
+        return pendingAckHandle.getPositionInPendingAck(position);
+    }
     @Override
     public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
         Map<String, String> newSubscriptionProperties;
@@ -1104,7 +1107,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
                     this.subscriptionProperties = newSubscriptionProperties;
                 });
     }
-
     /**
      * Return a merged map that contains the cursor properties specified by used
      * (eg. when using compaction subscription) and the subscription properties.
@@ -1187,4 +1189,9 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
+
+    @VisibleForTesting
+    public PendingAckHandle getPendingAckHandle() {
+        return pendingAckHandle;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index e9984baf007..2d2418859cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -157,4 +157,15 @@ public interface PendingAckHandle {
      * @return if the PendingAckStore is init.
      */
     boolean checkIfPendingAckStoreInit();
+
+    /**
+     * If it returns null, it means this Position is not in pendingAck.
+     * <p>
+     * If it does not return null, it means this Position is in pendingAck and if it is batch Position,
+     * it will return the corresponding ackSet in pendingAck
+     *
+     * @param position {@link Position} witch need to get in pendingAck
+     * @return {@link Position} return the position in pendingAck
+     */
+    PositionImpl getPositionInPendingAck(PositionImpl position);
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 6a0aca6f9d7..46937b6666f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -100,4 +100,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
     public boolean checkIfPendingAckStoreInit() {
         return false;
     }
+
+    @Override
+    public PositionImpl getPositionInPendingAck(PositionImpl position) {
+        return null;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index d4a23817e63..aa2244a4bd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -987,6 +987,17 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
     }
 
+    @Override
+    public PositionImpl getPositionInPendingAck(PositionImpl position) {
+        if (individualAckPositions != null) {
+            MutablePair<PositionImpl, Integer> positionPair = this.individualAckPositions.get(position);
+            if (positionPair != null) {
+                return positionPair.getLeft();
+            }
+        }
+        return null;
+    }
+
     protected void handleCacheRequest() {
         while (true) {
             Runnable runnable = acceptQueue.poll();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index e3bbe1ad97b..b0a4b28bbdc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -115,6 +115,62 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
+    @Test
+    private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Exception {
+        final String topicName = NAMESPACE1 + "/testIndividualAckAbortFilterAckSetInPendingAckState";
+        final int count = 9;
+        Producer<Integer> producer = pulsarClient
+                .newProducer(Schema.INT32)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .batchingMaxMessages(count).create();
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient
+                .newConsumer(Schema.INT32)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        for (int i = 0; i < count; i++) {
+            producer.sendAsync(i);
+        }
+
+        Transaction firstTransaction = getTxn();
+
+        Transaction secondTransaction = getTxn();
+
+        // firstTransaction ack the first three messages and don't end the firstTransaction
+        for (int i = 0; i < count / 3; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), firstTransaction).get();
+        }
+
+        // if secondTransaction abort we only can receive the middle three messages
+        for (int i = 0; i < count / 3; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), secondTransaction).get();
+        }
+
+        // consumer normal ack the last three messages
+        for (int i = 0; i < count / 3; i++) {
+            consumer.acknowledgeAsync(consumer.receive()).get();
+        }
+
+        // if secondTransaction abort we only can receive the middle three messages
+        secondTransaction.abort().get();
+
+        // can receive 3 4 5 bit sit message
+        for (int i = 0; i < count / 3; i++) {
+            assertEquals(consumer.receive().getValue().intValue(), i + 3);
+        }
+
+        // can't receive message anymore
+        assertNull(consumer.receive(2, TimeUnit.SECONDS));
+    }
+
     @Test(dataProvider="enableBatch")
     private void produceCommitTest(boolean enableBatch) throws Exception {
         @Cleanup
@@ -641,7 +697,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
     }
 
-    private Transaction getTxn() throws Exception {
+    public Transaction getTxn() throws Exception {
         return pulsarClient
                 .newTransaction()
                 .withTransactionTimeout(10, TimeUnit.SECONDS)