You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/04 07:05:00 UTC

[pulsar] branch master updated: [improve][txn] change delete pending ack position from foreach to firstKey (#16927)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b65fda7aa3 [improve][txn] change delete pending ack position from foreach to firstKey (#16927)
5b65fda7aa3 is described below

commit 5b65fda7aa32ca14d0ab2e2a657ea97870332196
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Aug 4 15:04:54 2022 +0800

    [improve][txn] change delete pending ack position from foreach to firstKey (#16927)
---
 .../pendingack/impl/PendingAckHandleImpl.java      | 12 +++--
 .../pendingack/PendingAckInMemoryDeleteTest.java   | 54 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index c74fb3e9217..b3aec6c67f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -95,7 +95,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
      *     <p>
      *         If it does not exits the map, the position will be added to the map.
      */
-    private Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions;
+    private ConcurrentSkipListMap<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions;
 
     /**
      * The map is for transaction with position witch was cumulative acked by this transaction.
@@ -884,12 +884,14 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
             individualAckPositions.remove(position);
         }
 
-        individualAckPositions.forEach((persistentPosition, positionIntegerMutablePair) -> {
-            if (persistentPosition.compareTo((PositionImpl) persistentSubscription
+        while (individualAckPositions.firstEntry() != null) {
+            if (individualAckPositions.firstKey().compareTo((PositionImpl) persistentSubscription
                     .getCursor().getMarkDeletedPosition()) < 0) {
-                individualAckPositions.remove(persistentPosition);
+                individualAckPositions.remove(individualAckPositions.firstKey());
+            } else {
+                break;
             }
-        });
+        }
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index da2a3a940bd..c35d15d96da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.broker.transaction.pendingack;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -47,6 +48,7 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -280,6 +282,56 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testPendingAckClearPositionIsSmallerThanMarkDelete() throws Exception {
+        String normalTopic = NAMESPACE1 + "/testPendingAckClearPositionIsSmallerThanMarkDelete";
+        String subscriptionName = "test";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(normalTopic)
+                .subscriptionName(subscriptionName)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(normalTopic)
+                .enableBatching(true)
+                .batchingMaxMessages(200)
+                .create();
+
+        // mark delete position
+        producer.send("test1".getBytes());
+
+        Transaction commitTxn = getTxn();
+
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), commitTxn).get();
+
+        PendingAckHandle pendingAckHandle = Whitebox.getInternalState(getPulsarServiceList().get(0)
+                .getBrokerService().getTopic("persistent://" + normalTopic, false).get().get()
+                .getSubscription(subscriptionName), "pendingAckHandle");
+
+        Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions =
+                Whitebox.getInternalState(pendingAckHandle, "individualAckPositions");
+        // one message in pending ack state
+        assertEquals(1, individualAckPositions.size());
+
+        // put the PositionImpl.EARLIEST to the map
+        individualAckPositions.put(PositionImpl.EARLIEST, new MutablePair<>(PositionImpl.EARLIEST, 0));
+
+        // put the PositionImpl.LATEST to the map
+        individualAckPositions.put(PositionImpl.LATEST, new MutablePair<>(PositionImpl.EARLIEST, 0));
+
+        // three position in pending ack state
+        assertEquals(3, individualAckPositions.size());
+
+        // commit this txn will delete the received position and PositionImpl.EARLIEST, don't delete PositionImpl.LATEST
+        commitTxn.commit().get();
+        assertEquals(1, individualAckPositions.size());
+    }
+
     private Transaction getTxn() throws Exception {
         return pulsarClient
                 .newTransaction()