You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:05 UTC

[pulsar] 23/31: [PIP-163][Txn]Add lowWaterMark check before appending entry to TB (#15424)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e3ba66efc82e722dfa370a0dbaabcb1e673b5406
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 13 09:38:32 2022 +0800

    [PIP-163][Txn]Add lowWaterMark check before appending entry to TB (#15424)
    
    Master Issue: [#15423](https://github.com/apache/pulsar/issues/15423)
    ### Motivation & Modification
    Details can be found at https://github.com/apache/pulsar/issues/15423.
    
    (cherry picked from commit 15d6907153007ffbf94a351a19df31763b0c6d5a)
---
 .../broker/service/persistent/PersistentTopic.java |  8 +++--
 .../buffer/impl/TopicTransactionBuffer.java        | 24 +++++++++++++
 .../buffer/TransactionLowWaterMarkTest.java        | 40 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  6 ++--
 5 files changed, 72 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a36176cdb45..1ef90c35841 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2956,8 +2956,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             return;
         }
         if (isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
-            publishContext.completed(new NotAllowedException("Exceed maximum message size")
-                    , -1, -1);
+            publishContext.completed(new NotAllowedException("Exceed maximum message size"), -1, -1);
             decrementPendingWriteOpsAndCheck();
             return;
         }
@@ -2978,7 +2977,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         })
                         .exceptionally(throwable -> {
                             throwable = throwable.getCause();
-                            if (!(throwable instanceof ManagedLedgerException)) {
+                            if (throwable instanceof NotAllowedException) {
+                              publishContext.completed((NotAllowedException) throwable, -1, -1);
+                              return null;
+                            } else if (!(throwable instanceof ManagedLedgerException)) {
                                 throwable = new ManagedLedgerException(throwable);
                             }
                             addFailed((ManagedLedgerException) throwable, publishContext);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a432b76f5fa..c9cde544c80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -25,6 +25,7 @@ import io.netty.util.TimerTask;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.SneakyThrows;
@@ -95,6 +96,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>();
 
+    private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
@@ -240,6 +243,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits());
+        if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) {
+            completableFuture.completeExceptionally(new BrokerServiceException
+                    .NotAllowedException("Transaction [" + txnId + "] has been ended. "
+                    + "Please use a new transaction to send message."));
+            return completableFuture;
+        }
         topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
             @Override
             public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@@ -275,6 +285,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName());
         }
@@ -315,6 +332,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} abort on topic {}.", txnID.toString(), topic.getName());
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 873509ff6bf..ba0659892b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -287,4 +288,43 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
             fail();
         }
     }
+
+    @Test
+    public void testTBLowWaterMarkEndToEnd() throws Exception {
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn2.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
+            txn2 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(TOPIC)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        producer.newMessage(txn1).send();
+        producer.newMessage(txn2).send();
+
+        txn1.commit().get();
+        txn2.commit().get();
+
+        Field field = TransactionImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(txn1, TransactionImpl.State.OPEN);
+        try {
+            producer.newMessage(txn1).send();
+            fail();
+        } catch (PulsarClientException.NotAllowedException ignore) {
+            // no-op
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 29d86d58d1d..edefb2fe3a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -692,7 +692,7 @@ public class ClientCnx extends PulsarHandler {
             producers.get(producerId).terminated(this);
             break;
         case NotAllowedError:
-            producers.get(producerId).recoverNotAllowedError(sequenceId);
+            producers.get(producerId).recoverNotAllowedError(sequenceId, sendError.getMessage());
             break;
 
         default:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 45d1416f898..0bf37a93a3a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1186,16 +1186,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         resendMessages(cnx, this.connectionHandler.getEpoch());
     }
 
-    protected synchronized void recoverNotAllowedError(long sequenceId) {
+    protected synchronized void recoverNotAllowedError(long sequenceId, String errorMsg) {
         OpSendMsg op = pendingMessages.peek();
         if (op != null && sequenceId == getHighestSequenceId(op)) {
             pendingMessages.remove();
             releaseSemaphoreForSendOp(op);
             try {
                 op.sendComplete(
-                        new PulsarClientException.NotAllowedException(
-                                format("The size of the message which is produced by producer %s to the topic "
-                                        + "%s is not allowed", producerName, topic)));
+                        new PulsarClientException.NotAllowedException(errorMsg));
             } catch (Throwable t) {
                 log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
                         producerName, sequenceId, t);