You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:40 UTC

[pulsar] 01/19: [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c95c98cdfad40962e229ee33161859d640dd03b5
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 28 23:33:24 2022 +0800

    [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
    
    (cherry picked from commit af7990dca4720c7f120b2d90ec368cdc82c484e0)
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 1 +
 .../broker/transaction/buffer/TransactionLowWaterMarkTest.java    | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ad7903ba8d8..268ad1f0b8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2982,6 +2982,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                             throwable = throwable.getCause();
                             if (throwable instanceof NotAllowedException) {
                               publishContext.completed((NotAllowedException) throwable, -1, -1);
+                              decrementPendingWriteOpsAndCheck();
                               return null;
                             } else if (!(throwable instanceof ManagedLedgerException)) {
                                 throwable = new ManagedLedgerException(throwable);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index a1ff3e7d34a..5d91c16e76c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 
@@ -61,6 +62,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -322,12 +324,18 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
         Field field = TransactionImpl.class.getDeclaredField("state");
         field.setAccessible(true);
         field.set(txn1, TransactionImpl.State.OPEN);
+
+        AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
+                        false).get().get(), "pendingWriteOps");
         try {
             producer.newMessage(txn1).send();
             fail();
         } catch (PulsarClientException.NotAllowedException ignore) {
             // no-op
         }
+
+        assertEquals(pendingWriteOps.get(), 0);
     }
 
     @Test