You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/04 07:05:00 UTC
[pulsar] branch master updated: [improve][txn] change delete pending ack position from foreach to firstKey (#16927)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5b65fda7aa3 [improve][txn] change delete pending ack position from foreach to firstKey (#16927)
5b65fda7aa3 is described below
commit 5b65fda7aa32ca14d0ab2e2a657ea97870332196
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Aug 4 15:04:54 2022 +0800
[improve][txn] change delete pending ack position from foreach to firstKey (#16927)
---
.../pendingack/impl/PendingAckHandleImpl.java | 12 +++--
.../pendingack/PendingAckInMemoryDeleteTest.java | 54 +++++++++++++++++++++-
2 files changed, 60 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index c74fb3e9217..b3aec6c67f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -95,7 +95,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
* <p>
* If it does not exits the map, the position will be added to the map.
*/
- private Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions;
+ private ConcurrentSkipListMap<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions;
/**
* The map is for transaction with position witch was cumulative acked by this transaction.
@@ -884,12 +884,14 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
individualAckPositions.remove(position);
}
- individualAckPositions.forEach((persistentPosition, positionIntegerMutablePair) -> {
- if (persistentPosition.compareTo((PositionImpl) persistentSubscription
+ while (individualAckPositions.firstEntry() != null) {
+ if (individualAckPositions.firstKey().compareTo((PositionImpl) persistentSubscription
.getCursor().getMarkDeletedPosition()) < 0) {
- individualAckPositions.remove(persistentPosition);
+ individualAckPositions.remove(individualAckPositions.firstKey());
+ } else {
+ break;
}
- });
+ }
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index da2a3a940bd..c35d15d96da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.broker.transaction.pendingack;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -47,6 +48,7 @@ import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -280,6 +282,56 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
}
}
+ @Test
+ public void testPendingAckClearPositionIsSmallerThanMarkDelete() throws Exception {
+ String normalTopic = NAMESPACE1 + "/testPendingAckClearPositionIsSmallerThanMarkDelete";
+ String subscriptionName = "test";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(normalTopic)
+ .subscriptionName(subscriptionName)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(normalTopic)
+ .enableBatching(true)
+ .batchingMaxMessages(200)
+ .create();
+
+ // mark delete position
+ producer.send("test1".getBytes());
+
+ Transaction commitTxn = getTxn();
+
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), commitTxn).get();
+
+ PendingAckHandle pendingAckHandle = Whitebox.getInternalState(getPulsarServiceList().get(0)
+ .getBrokerService().getTopic("persistent://" + normalTopic, false).get().get()
+ .getSubscription(subscriptionName), "pendingAckHandle");
+
+ Map<PositionImpl, MutablePair<PositionImpl, Integer>> individualAckPositions =
+ Whitebox.getInternalState(pendingAckHandle, "individualAckPositions");
+ // one message in pending ack state
+ assertEquals(1, individualAckPositions.size());
+
+ // put the PositionImpl.EARLIEST to the map
+ individualAckPositions.put(PositionImpl.EARLIEST, new MutablePair<>(PositionImpl.EARLIEST, 0));
+
+ // put the PositionImpl.LATEST to the map
+ individualAckPositions.put(PositionImpl.LATEST, new MutablePair<>(PositionImpl.EARLIEST, 0));
+
+ // three position in pending ack state
+ assertEquals(3, individualAckPositions.size());
+
+ // commit this txn will delete the received position and PositionImpl.EARLIEST, don't delete PositionImpl.LATEST
+ commitTxn.commit().get();
+ assertEquals(1, individualAckPositions.size());
+ }
+
private Transaction getTxn() throws Exception {
return pulsarClient
.newTransaction()