You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:34 UTC
[pulsar] 04/17: [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fa78cf8f45ba40f5e699f3510606db444846b323
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat May 21 17:15:56 2022 +0800
[optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)
(cherry picked from commit fcf5e148b055d617db37eef3c40d4004e74190a5)
---
.../buffer/impl/TopicTransactionBuffer.java | 72 ++++++-----
.../pendingack/impl/PendingAckHandleImpl.java | 49 ++++++--
.../buffer/TransactionLowWaterMarkTest.java | 140 ++++++++++++++++++++-
.../pendingack/PendingAckPersistentTest.java | 20 ++-
4 files changed, 223 insertions(+), 58 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c9cde544c80..6469ba9fb9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.SneakyThrows;
@@ -96,8 +97,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>();
+ /**
+ * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
+ */
private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+ private final Semaphore handleLowWaterMark = new Semaphore(1);
+
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
@@ -285,13 +291,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
- lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
- if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
- return lowWaterMark;
- } else {
- return oldLowWaterMark;
- }
- });
if (log.isDebugEnabled()) {
log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName());
}
@@ -332,13 +331,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
- lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
- if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
- return lowWaterMark;
- } else {
- return oldLowWaterMark;
- }
- });
if (log.isDebugEnabled()) {
log.debug("Transaction {} abort on topic {}.", txnID.toString(), topic.getName());
}
@@ -358,12 +350,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
synchronized (TopicTransactionBuffer.this) {
aborts.put(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
- handleLowWaterMark(txnID, lowWaterMark);
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
clearAbortedTransactions();
takeSnapshotByChangeTimes();
}
completableFuture.complete(null);
+ handleLowWaterMark(txnID, lowWaterMark);
}
@Override
@@ -384,30 +376,36 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
}
private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
- if (!ongoingTxns.isEmpty()) {
- TxnID firstTxn = ongoingTxns.firstKey();
- if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && lowWaterMark >= firstTxn.getLeastSigBits()) {
- ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
- firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
- try {
- topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData, Object ctx) {
- synchronized (TopicTransactionBuffer.this) {
- aborts.put(firstTxn, (PositionImpl) position);
- updateMaxReadPosition(firstTxn);
- }
- }
-
- @Override
- public void addFailed(ManagedLedgerException exception, Object ctx) {
- log.error("Failed to abort low water mark for txn {}", txnID, exception);
- }
- }, null);
- } finally {
- abortMarker.release();
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
+ if (handleLowWaterMark.tryAcquire()) {
+ if (!ongoingTxns.isEmpty()) {
+ TxnID firstTxn = ongoingTxns.firstKey();
+ long tCId = firstTxn.getMostSigBits();
+ Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+ if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+ abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+ .thenRun(() -> {
+ log.warn("Successes to abort low water mark for txn [{}], topic [{}],"
+ + " lowWaterMark [{}]", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ })
+ .exceptionally(ex -> {
+ log.warn("Failed to abort low water mark for txn {}, topic [{}], "
+ + "lowWaterMark [{}], ", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId,
+ ex);
+ handleLowWaterMark.release();
+ return null;
+ });
+ return;
}
}
+ handleLowWaterMark.release();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7e11592e063..323a1414e43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -29,10 +29,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -114,6 +116,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
+ /**
+ * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
+ */
+ private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+
+ private final Semaphore handleLowWaterMark = new Semaphore(1);
+
@Getter
private final ExecutorService internalPinnedExecutor;
@@ -595,20 +604,34 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
- if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) {
- TxnID firstTxn = individualAckOfTransaction.firstKey();
-
- if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
- && firstTxn.getLeastSigBits() <= lowWaterMark) {
- abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
- log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
- }).exceptionally(e -> {
- log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
- return null;
- });
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
+
+ if (handleLowWaterMark.tryAcquire()) {
+ if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) {
+ TxnID firstTxn = individualAckOfTransaction.firstKey();
+ long tCId = firstTxn.getMostSigBits();
+ Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+ if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+ abortTxn(firstTxn, null, lowWaterMarkOfFirstTxnId).thenRun(() -> {
+ log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, firstTxn, lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ }).exceptionally(ex -> {
+ log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, firstTxn, lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ return null;
+ });
+ return;
+ }
}
+ handleLowWaterMark.release();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index ba0659892b4..a1ff3e7d34a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -37,7 +37,9 @@ import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -71,7 +73,7 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionLowWaterMarkTest extends TransactionTestBase {
- private static final String TOPIC = NAMESPACE1 + "/test-topic";
+ private static final String TOPIC = "persistent://" + NAMESPACE1 + "/test-topic";
@BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
@@ -216,7 +218,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
- CompletableFuture<Optional<Topic>> completableFuture = topics.get("persistent://" + TOPIC);
+ CompletableFuture<Optional<Topic>> completableFuture = topics.get(TOPIC);
if (completableFuture != null) {
Optional<Topic> topic = completableFuture.get();
if (topic.isPresent()) {
@@ -327,4 +329,138 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
// no-op
}
}
+
+ @Test
+ public void testLowWaterMarkForDifferentTC() throws Exception {
+ String subName = "sub";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(TOPIC)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(TOPIC)
+ .subscriptionName(subName)
+ .subscribe();
+
+ Transaction txn1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn2.getTxnID().getMostSigBits() == txn1.getTxnID().getMostSigBits()) {
+ txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+ Transaction txn3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn3.getTxnID().getMostSigBits() != txn2.getTxnID().getMostSigBits()) {
+ txn3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+
+ Transaction txn4 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn4.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
+ txn4 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().send();
+ }
+
+ producer.newMessage(txn1).send();
+ producer.newMessage(txn2).send();
+ producer.newMessage(txn3).send();
+ producer.newMessage(txn4).send();
+
+ Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message1.getMessageId(), txn1);
+ Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message2.getMessageId(), txn2);
+ Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message3.getMessageId(), txn3);
+ Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message4.getMessageId(), txn4);
+
+ txn1.commit().get();
+ txn2.commit().get();
+
+ Field field = TransactionImpl.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(txn1, TransactionImpl.State.OPEN);
+ field.set(txn2, TransactionImpl.State.OPEN);
+
+ producer.newMessage(txn1).send();
+ producer.newMessage(txn2).send();
+
+ Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message5.getMessageId(), txn1);
+ Message<byte[]> message6 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message6.getMessageId(), txn2);
+
+ txn3.commit().get();
+ TxnID txnID1 = txn1.getTxnID();
+ TxnID txnID2 = txn2.getTxnID();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(checkTxnIsOngoingInTP(txnID1, subName));
+ assertTrue(checkTxnIsOngoingInTP(txnID2, subName));
+ assertTrue(checkTxnIsOngoingInTB(txnID1));
+ assertTrue(checkTxnIsOngoingInTB(txnID2));
+ });
+
+ txn4.commit().get();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(checkTxnIsOngoingInTP(txnID1, subName));
+ assertFalse(checkTxnIsOngoingInTP(txnID2, subName));
+ assertFalse(checkTxnIsOngoingInTB(txnID1));
+ assertFalse(checkTxnIsOngoingInTB(txnID2));
+ });
+ }
+
+ private boolean checkTxnIsOngoingInTP(TxnID txnID, String subName) throws Exception {
+ PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic(TopicName.get(TOPIC).toString(), false)
+ .get().get();
+
+ PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+
+ Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+ field1.setAccessible(true);
+ PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) field1.get(persistentSubscription);
+
+ Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+ field2.setAccessible(true);
+ LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
+ (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);
+ return individualAckOfTransaction.containsKey(txnID);
+ }
+
+ private boolean checkTxnIsOngoingInTB(TxnID txnID) throws Exception {
+ PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic(TopicName.get(TOPIC).toString(), false)
+ .get().get();
+
+ TopicTransactionBuffer topicTransactionBuffer =
+ (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+ Field field3 = TopicTransactionBuffer.class.getDeclaredField("ongoingTxns");
+ field3.setAccessible(true);
+ LinkedMap<TxnID, PositionImpl> ongoingTxns =
+ (LinkedMap<TxnID, PositionImpl>) field3.get(topicTransactionBuffer);
+ return ongoingTxns.containsKey(txnID);
+
+ }
+
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 96b2b9c14bf..367c63797d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -542,24 +542,32 @@ public class PendingAckPersistentTest extends TransactionTestBase {
.get();
PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+ Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+ field1.setAccessible(true);
+ PendingAckHandleImpl oldPendingAckHandle = (PendingAckHandleImpl) field1.get(persistentSubscription);
+ Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+ field2.setAccessible(true);
+ LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> oldIndividualAckOfTransaction =
+ (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(oldPendingAckHandle);
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(oldIndividualAckOfTransaction.size(), 0));
+
PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
method.setAccessible(true);
method.invoke(pendingAckHandle);
- Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
- field1.setAccessible(true);
- CompletableFuture<PendingAckStore> completableFuture =
- (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);
+ Field field3 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+ field3.setAccessible(true);
Awaitility.await().until(() -> {
+ CompletableFuture<PendingAckStore> completableFuture =
+ (CompletableFuture<PendingAckStore>) field3.get(pendingAckHandle);
completableFuture.get();
return true;
});
- Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
- field2.setAccessible(true);
+
LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
(LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);