You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/20 09:38:20 UTC

[rocketmq] 01/02: fix: use the last TransactionData for the same transactionId

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d42a7f92cf42df6f9d771c200640a96ffb573566
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Jul 20 15:40:40 2022 +0800

    fix: use the last TransactionData for the same transactionId
---
 .../proxy/service/transaction/AbstractTransactionService.java      | 2 +-
 .../rocketmq/proxy/service/transaction/TransactionDataManager.java | 6 +++---
 .../proxy/service/transaction/TransactionDataManagerTest.java      | 7 ++++---
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
index 262e430c3..b55cc3905 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
@@ -57,7 +57,7 @@ public abstract class AbstractTransactionService implements TransactionService,
     @Override
     public EndTransactionRequestData genEndTransactionRequestHeader(String producerGroup, Integer commitOrRollback,
         boolean fromTransactionCheck, String msgId, String transactionId) {
-        TransactionData transactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(producerGroup, transactionId);
+        TransactionData transactionData = this.transactionDataManager.pollNoExpireTransactionData(producerGroup, transactionId);
         if (transactionData == null) {
             return null;
         }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
index 740afab3a..2c19b858b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
@@ -58,13 +58,13 @@ public class TransactionDataManager implements StartAndShutdown {
         });
     }
 
-    public TransactionData pollFirstNoExpireTransactionData(String producerGroup, String transactionId) {
+    public TransactionData pollNoExpireTransactionData(String producerGroup, String transactionId) {
         AtomicReference<TransactionData> res = new AtomicReference<>();
         long currTimestamp = System.currentTimeMillis();
         this.transactionIdDataMap.computeIfPresent(buildKey(producerGroup, transactionId), (key, dataSet) -> {
-            TransactionData data = dataSet.pollFirst();
+            TransactionData data = dataSet.pollLast();
             while (data != null && data.getExpireTime() < currTimestamp) {
-                data = dataSet.pollFirst();
+                data = dataSet.pollLast();
             }
             if (data != null) {
                 res.set(data);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
index 2d03ab6af..d9620740e 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
@@ -65,7 +66,7 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest {
     }
 
     @Test
-    public void testPollFirst() {
+    public void testPoll() {
         String txId = MessageClientIDSetter.createUniqID();
         TransactionData transactionData1 = createTransactionData(txId, System.currentTimeMillis() - Duration.ofMinutes(2).toMillis());
         TransactionData transactionData2 = createTransactionData(txId);
@@ -73,9 +74,9 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest {
         this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData1);
         this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData2);
 
-        TransactionData resTransactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(PRODUCER_GROUP, txId);
+        TransactionData resTransactionData = this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId);
         assertSame(transactionData2, resTransactionData);
-        assertTrue(this.transactionDataManager.transactionIdDataMap.isEmpty());
+        assertNull(this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId));
     }
 
     @Test