You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/02 04:17:09 UTC
[pulsar] branch branch-2.9 updated: [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 99e4a1933d0 [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
99e4a1933d0 is described below
commit 99e4a1933d0dea4d2818cc4b218e820300820fb1
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 28 23:33:24 2022 +0800
[fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
(cherry picked from commit af7990dca4720c7f120b2d90ec368cdc82c484e0)
---
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 +
.../broker/transaction/buffer/TransactionLowWaterMarkTest.java | 8 ++++++++
2 files changed, 9 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8c592a7e29c..932def13db9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3083,6 +3083,7 @@ public class PersistentTopic extends AbstractTopic
throwable = throwable.getCause();
if (throwable instanceof NotAllowedException) {
publishContext.completed((NotAllowedException) throwable, -1, -1);
+ decrementPendingWriteOpsAndCheck();
return null;
} else if (!(throwable instanceof ManagedLedgerException)) {
throwable = new ManagedLedgerException(throwable);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 1f012d7a6b1..3efdc2473bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -59,6 +60,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -320,12 +322,18 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(txn1, TransactionImpl.State.OPEN);
+
+ AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
+ false).get().get(), "pendingWriteOps");
try {
producer.newMessage(txn1).send();
fail();
} catch (PulsarClientException.NotAllowedException ignore) {
// no-op
}
+
+ assertEquals(pendingWriteOps.get(), 0);
}
@Test