You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/20 09:38:19 UTC

[rocketmq] branch develop updated (e6b4e15eb -> d26ba7d43)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


    from e6b4e15eb Merge pull request #4225 from drpmma/feature/grpc-develop-v2
     new d42a7f92c fix: use the last TransactionData for the same transactionId
     new d26ba7d43 fix: unit test cases

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../proxy/service/transaction/AbstractTransactionService.java      | 2 +-
 .../rocketmq/proxy/service/transaction/TransactionDataManager.java | 6 +++---
 .../rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java   | 2 ++
 .../proxy/service/transaction/TransactionDataManagerTest.java      | 7 ++++---
 4 files changed, 10 insertions(+), 7 deletions(-)


[rocketmq] 01/02: fix: use the last TransactionData for the same transactionId

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d42a7f92cf42df6f9d771c200640a96ffb573566
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Jul 20 15:40:40 2022 +0800

    fix: use the last TransactionData for the same transactionId
---
 .../proxy/service/transaction/AbstractTransactionService.java      | 2 +-
 .../rocketmq/proxy/service/transaction/TransactionDataManager.java | 6 +++---
 .../proxy/service/transaction/TransactionDataManagerTest.java      | 7 ++++---
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
index 262e430c3..b55cc3905 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
@@ -57,7 +57,7 @@ public abstract class AbstractTransactionService implements TransactionService,
     @Override
     public EndTransactionRequestData genEndTransactionRequestHeader(String producerGroup, Integer commitOrRollback,
         boolean fromTransactionCheck, String msgId, String transactionId) {
-        TransactionData transactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(producerGroup, transactionId);
+        TransactionData transactionData = this.transactionDataManager.pollNoExpireTransactionData(producerGroup, transactionId);
         if (transactionData == null) {
             return null;
         }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
index 740afab3a..2c19b858b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
@@ -58,13 +58,13 @@ public class TransactionDataManager implements StartAndShutdown {
         });
     }
 
-    public TransactionData pollFirstNoExpireTransactionData(String producerGroup, String transactionId) {
+    public TransactionData pollNoExpireTransactionData(String producerGroup, String transactionId) {
         AtomicReference<TransactionData> res = new AtomicReference<>();
         long currTimestamp = System.currentTimeMillis();
         this.transactionIdDataMap.computeIfPresent(buildKey(producerGroup, transactionId), (key, dataSet) -> {
-            TransactionData data = dataSet.pollFirst();
+            TransactionData data = dataSet.pollLast();
             while (data != null && data.getExpireTime() < currTimestamp) {
-                data = dataSet.pollFirst();
+                data = dataSet.pollLast();
             }
             if (data != null) {
                 res.set(data);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
index 2d03ab6af..d9620740e 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
@@ -65,7 +66,7 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest {
     }
 
     @Test
-    public void testPollFirst() {
+    public void testPoll() {
         String txId = MessageClientIDSetter.createUniqID();
         TransactionData transactionData1 = createTransactionData(txId, System.currentTimeMillis() - Duration.ofMinutes(2).toMillis());
         TransactionData transactionData2 = createTransactionData(txId);
@@ -73,9 +74,9 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest {
         this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData1);
         this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData2);
 
-        TransactionData resTransactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(PRODUCER_GROUP, txId);
+        TransactionData resTransactionData = this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId);
         assertSame(transactionData2, resTransactionData);
-        assertTrue(this.transactionDataManager.transactionIdDataMap.isEmpty());
+        assertNull(this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId));
     }
 
     @Test


[rocketmq] 02/02: fix: unit test cases

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d26ba7d433e05d7b3a518064c8ebf99c48df0bbd
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Wed Jul 20 15:58:53 2022 +0800

    fix: unit test cases
---
 .../apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
index 7776d387e..117156b65 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
@@ -205,6 +205,8 @@ public class SendMessageActivityTest extends BaseActivityTest {
     @Test
     public void testBuildMessage() {
         long deliveryTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5);
+        ConfigurationManager.getProxyConfig().setMessageDelayLevel("1s 5s");
+        ConfigurationManager.getProxyConfig().initData();
         String msgId = MessageClientIDSetter.createUniqID();
 
         org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null,