You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 07:58:29 UTC

[pulsar] 10/13: [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6c410fc8e4187f010845e9d344a61e817954617c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:54:17 2022 +0800

    [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
    
    (cherry picked from commit 0a9fd913528181951fd6ad97d3ba07e11e77cd70)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  1 -
 .../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a64b0f7..d61bd28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -474,7 +474,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
                 maxReadPosition = position;
-                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
                     maxReadPosition = position;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 2e7fc5a..07a7c95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -796,4 +797,28 @@ public class TransactionTest extends TransactionTestBase {
         timeout = (Timeout) field.get(transaction);
         Assert.assertTrue(timeout.isCancelled());
     }
+
+    @Test
+    public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(NAMESPACE1 + "/test", true)
+                .get().get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+        field.setAccessible(true);
+        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer);
+        Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
+        field1.setAccessible(true);
+
+        Awaitility.await().untilAsserted(() -> {
+                    TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
+                    Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
+        });
+        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
+        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+    }
 }
\ No newline at end of file