You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/11 07:30:09 UTC
[rocketmq] branch develop updated: [ISSUE #3957] Fix LmqConsumerOffsetManager deserialize error (#3958)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 40d9505 [ISSUE #3957] Fix LmqConsumerOffsetManager deserialize error (#3958)
40d9505 is described below
commit 40d950578643775000ba29a33afdc3d8746cb8b6
Author: tianliuliu <64...@qq.com>
AuthorDate: Fri Mar 11 15:29:53 2022 +0800
[ISSUE #3957] Fix LmqConsumerOffsetManager deserialize error (#3958)
LmqConsumerOffsetManager deserialize error (#3958)
---
.../broker/offset/LmqConsumerOffsetManager.java | 4 ++++
.../offset/LmqConsumerOffsetManagerTest.java | 27 ++++++++++++++++++++++
2 files changed, 31 insertions(+)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index 7e5d774..ec730d3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -28,6 +28,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);
+ public LmqConsumerOffsetManager() {
+
+ }
+
public LmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java
index 6ec20c6..94290f6 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java
@@ -73,6 +73,33 @@ public class LmqConsumerOffsetManagerTest {
assertThat(offset1).isEqualTo(-1L);
}
+ @Test
+ public void testOffsetManage1() {
+ LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController);
+
+ String lmqTopicName = "%LMQ%1111";
+
+ String lmqGroupName = "%LMQ%GID_test";
+
+ lmqConsumerOffsetManager.commitOffset("127.0.0.1", lmqGroupName, lmqTopicName, 0, 10L);
+
+ lmqTopicName = "%LMQ%1222";
+
+ lmqGroupName = "%LMQ%GID_test222";
+
+ lmqConsumerOffsetManager.commitOffset("127.0.0.1", lmqGroupName, lmqTopicName, 0, 10L);
+ lmqConsumerOffsetManager.commitOffset("127.0.0.1","GID_test1", "MqttTest",0, 10L);
+
+ String json = lmqConsumerOffsetManager.encode(true);
+
+ LmqConsumerOffsetManager lmqConsumerOffsetManager1 = new LmqConsumerOffsetManager(brokerController);
+
+ lmqConsumerOffsetManager1.decode(json);
+
+ assertThat(lmqConsumerOffsetManager1.getOffsetTable().size()).isEqualTo(1);
+ assertThat(lmqConsumerOffsetManager1.getLmqOffsetTable().size()).isEqualTo(2);
+ }
+
@After
public void destroy() {
UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir()));