You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:40 UTC
[pulsar] 01/19: [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c95c98cdfad40962e229ee33161859d640dd03b5
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 28 23:33:24 2022 +0800
[fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps (#16266)
(cherry picked from commit af7990dca4720c7f120b2d90ec368cdc82c484e0)
---
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 +
.../broker/transaction/buffer/TransactionLowWaterMarkTest.java | 8 ++++++++
2 files changed, 9 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ad7903ba8d8..268ad1f0b8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2982,6 +2982,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
throwable = throwable.getCause();
if (throwable instanceof NotAllowedException) {
publishContext.completed((NotAllowedException) throwable, -1, -1);
+ decrementPendingWriteOpsAndCheck();
return null;
} else if (!(throwable instanceof ManagedLedgerException)) {
throwable = new ManagedLedgerException(throwable);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index a1ff3e7d34a..5d91c16e76c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -61,6 +62,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -322,12 +324,18 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(txn1, TransactionImpl.State.OPEN);
+
+ AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
+ false).get().get(), "pendingWriteOps");
try {
producer.newMessage(txn1).send();
fail();
} catch (PulsarClientException.NotAllowedException ignore) {
// no-op
}
+
+ assertEquals(pendingWriteOps.get(), 0);
}
@Test