You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/20 09:38:20 UTC
[rocketmq] 01/02: fix: use the last TransactionData for the same transactionId
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit d42a7f92cf42df6f9d771c200640a96ffb573566
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Jul 20 15:40:40 2022 +0800
fix: use the last TransactionData for the same transactionId
---
.../proxy/service/transaction/AbstractTransactionService.java | 2 +-
.../rocketmq/proxy/service/transaction/TransactionDataManager.java | 6 +++---
.../proxy/service/transaction/TransactionDataManagerTest.java | 7 ++++---
3 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
index 262e430c3..b55cc3905 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
@@ -57,7 +57,7 @@ public abstract class AbstractTransactionService implements TransactionService,
@Override
public EndTransactionRequestData genEndTransactionRequestHeader(String producerGroup, Integer commitOrRollback,
boolean fromTransactionCheck, String msgId, String transactionId) {
- TransactionData transactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(producerGroup, transactionId);
+ TransactionData transactionData = this.transactionDataManager.pollNoExpireTransactionData(producerGroup, transactionId);
if (transactionData == null) {
return null;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
index 740afab3a..2c19b858b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
@@ -58,13 +58,13 @@ public class TransactionDataManager implements StartAndShutdown {
});
}
- public TransactionData pollFirstNoExpireTransactionData(String producerGroup, String transactionId) {
+ public TransactionData pollNoExpireTransactionData(String producerGroup, String transactionId) {
AtomicReference<TransactionData> res = new AtomicReference<>();
long currTimestamp = System.currentTimeMillis();
this.transactionIdDataMap.computeIfPresent(buildKey(producerGroup, transactionId), (key, dataSet) -> {
- TransactionData data = dataSet.pollFirst();
+ TransactionData data = dataSet.pollLast();
while (data != null && data.getExpireTime() < currTimestamp) {
- data = dataSet.pollFirst();
+ data = dataSet.pollLast();
}
if (data != null) {
res.set(data);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
index 2d03ab6af..d9620740e 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -65,7 +66,7 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest {
}
@Test
- public void testPollFirst() {
+ public void testPoll() {
String txId = MessageClientIDSetter.createUniqID();
TransactionData transactionData1 = createTransactionData(txId, System.currentTimeMillis() - Duration.ofMinutes(2).toMillis());
TransactionData transactionData2 = createTransactionData(txId);
@@ -73,9 +74,9 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest {
this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData1);
this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData2);
- TransactionData resTransactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(PRODUCER_GROUP, txId);
+ TransactionData resTransactionData = this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId);
assertSame(transactionData2, resTransactionData);
- assertTrue(this.transactionDataManager.transactionIdDataMap.isEmpty());
+ assertNull(this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId));
}
@Test