You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 07:58:29 UTC
[pulsar] 10/13: [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6c410fc8e4187f010845e9d344a61e817954617c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:54:17 2022 +0800
[Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
(cherry picked from commit 0a9fd913528181951fd6ad97d3ba07e11e77cd70)
---
.../buffer/impl/TopicTransactionBuffer.java | 1 -
.../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a64b0f7..d61bd28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -474,7 +474,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
maxReadPosition = position;
- changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 2e7fc5a..07a7c95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -796,4 +797,28 @@ public class TransactionTest extends TransactionTestBase {
timeout = (Timeout) field.get(transaction);
Assert.assertTrue(timeout.isCancelled());
}
+
+ @Test
+ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
+ PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic(NAMESPACE1 + "/test", true)
+ .get().get();
+ TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+ Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+ field.setAccessible(true);
+ AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer);
+ Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
+ field1.setAccessible(true);
+
+ Awaitility.await().untilAsserted(() -> {
+ TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
+ Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
+ });
+ Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+ buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
+ Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+ }
}
\ No newline at end of file