You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/02 04:17:09 UTC

[pulsar] branch branch-2.9 updated: [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 99e4a1933d0 [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
99e4a1933d0 is described below

commit 99e4a1933d0dea4d2818cc4b218e820300820fb1
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 28 23:33:24 2022 +0800

    [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
    
    (cherry picked from commit af7990dca4720c7f120b2d90ec368cdc82c484e0)
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 1 +
 .../broker/transaction/buffer/TransactionLowWaterMarkTest.java    | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8c592a7e29c..932def13db9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3083,6 +3083,7 @@ public class PersistentTopic extends AbstractTopic
                             throwable = throwable.getCause();
                             if (throwable instanceof NotAllowedException) {
                               publishContext.completed((NotAllowedException) throwable, -1, -1);
+                              decrementPendingWriteOpsAndCheck();
                               return null;
                             } else if (!(throwable instanceof ManagedLedgerException)) {
                                 throwable = new ManagedLedgerException(throwable);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 1f012d7a6b1..3efdc2473bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -59,6 +60,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -320,12 +322,18 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
         Field field = TransactionImpl.class.getDeclaredField("state");
         field.setAccessible(true);
         field.set(txn1, TransactionImpl.State.OPEN);
+
+        AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
+                        false).get().get(), "pendingWriteOps");
         try {
             producer.newMessage(txn1).send();
             fail();
         } catch (PulsarClientException.NotAllowedException ignore) {
             // no-op
         }
+
+        assertEquals(pendingWriteOps.get(), 0);
     }
 
     @Test