You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:05 UTC
[pulsar] 23/31: [PIP-163][Txn]Add lowWaterMark check before appending entry to TB (#15424)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e3ba66efc82e722dfa370a0dbaabcb1e673b5406
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 13 09:38:32 2022 +0800
[PIP-163][Txn]Add lowWaterMark check before appending entry to TB (#15424)
Master Issue: [#15423](https://github.com/apache/pulsar/issues/15423)
### Motivation & Modification
Details can be found at https://github.com/apache/pulsar/issues/15423.
(cherry picked from commit 15d6907153007ffbf94a351a19df31763b0c6d5a)
---
.../broker/service/persistent/PersistentTopic.java | 8 +++--
.../buffer/impl/TopicTransactionBuffer.java | 24 +++++++++++++
.../buffer/TransactionLowWaterMarkTest.java | 40 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 2 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 6 ++--
5 files changed, 72 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a36176cdb45..1ef90c35841 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2956,8 +2956,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return;
}
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
- publishContext.completed(new NotAllowedException("Exceed maximum message size")
- , -1, -1);
+ publishContext.completed(new NotAllowedException("Exceed maximum message size"), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
@@ -2978,7 +2977,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
})
.exceptionally(throwable -> {
throwable = throwable.getCause();
- if (!(throwable instanceof ManagedLedgerException)) {
+ if (throwable instanceof NotAllowedException) {
+ publishContext.completed((NotAllowedException) throwable, -1, -1);
+ return null;
+ } else if (!(throwable instanceof ManagedLedgerException)) {
throwable = new ManagedLedgerException(throwable);
}
addFailed((ManagedLedgerException) throwable, publishContext);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a432b76f5fa..c9cde544c80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -25,6 +25,7 @@ import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.SneakyThrows;
@@ -95,6 +96,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>();
+ private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
@@ -240,6 +243,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+ Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits());
+ if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) {
+ completableFuture.completeExceptionally(new BrokerServiceException
+ .NotAllowedException("Transaction [" + txnId + "] has been ended. "
+ + "Please use a new transaction to send message."));
+ return completableFuture;
+ }
topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@@ -275,6 +285,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
if (log.isDebugEnabled()) {
log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName());
}
@@ -315,6 +332,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
if (log.isDebugEnabled()) {
log.debug("Transaction {} abort on topic {}.", txnID.toString(), topic.getName());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 873509ff6bf..ba0659892b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -287,4 +288,43 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
fail();
}
}
+
+ @Test
+ public void testTBLowWaterMarkEndToEnd() throws Exception {
+ Transaction txn1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn2.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
+ txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .topic(TOPIC)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ producer.newMessage(txn1).send();
+ producer.newMessage(txn2).send();
+
+ txn1.commit().get();
+ txn2.commit().get();
+
+ Field field = TransactionImpl.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(txn1, TransactionImpl.State.OPEN);
+ try {
+ producer.newMessage(txn1).send();
+ fail();
+ } catch (PulsarClientException.NotAllowedException ignore) {
+ // no-op
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 29d86d58d1d..edefb2fe3a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -692,7 +692,7 @@ public class ClientCnx extends PulsarHandler {
producers.get(producerId).terminated(this);
break;
case NotAllowedError:
- producers.get(producerId).recoverNotAllowedError(sequenceId);
+ producers.get(producerId).recoverNotAllowedError(sequenceId, sendError.getMessage());
break;
default:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 45d1416f898..0bf37a93a3a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1186,16 +1186,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
resendMessages(cnx, this.connectionHandler.getEpoch());
}
- protected synchronized void recoverNotAllowedError(long sequenceId) {
+ protected synchronized void recoverNotAllowedError(long sequenceId, String errorMsg) {
OpSendMsg op = pendingMessages.peek();
if (op != null && sequenceId == getHighestSequenceId(op)) {
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
try {
op.sendComplete(
- new PulsarClientException.NotAllowedException(
- format("The size of the message which is produced by producer %s to the topic "
- + "%s is not allowed", producerName, topic)));
+ new PulsarClientException.NotAllowedException(errorMsg));
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
producerName, sequenceId, t);