You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/24 07:33:54 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix the serialize probelm

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new a8ef92e  Fix the serialize probelm
a8ef92e is described below

commit a8ef92e94dccbdbee52c86ae8a57620e1783d623
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 24 15:33:37 2021 +0800

    Fix the serialize probelm
---
 .../apache/rocketmq/broker/BrokerController.java   |  5 +-
 .../broker/processor/AdminBrokerProcessor.java     |  2 +-
 .../broker/processor/PullMessageProcessor.java     |  2 +-
 .../broker/topic/TopicQueueMappingManager.java     | 11 ++--
 .../common/statictopic/LogicQueueMappingItem.java  | 33 +++++++++--
 .../statictopic/TopicQueueMappingContext.java      |  8 ++-
 .../statictopic/TopicQueueMappingDetail.java       | 65 +++++++++-------------
 .../common/statictopic/TopicQueueMappingInfo.java  | 14 ++++-
 .../common/statictopic/TopicQueueMappingOne.java   |  8 ++-
 .../common/statictopic/TopicQueueMappingUtils.java | 17 +++---
 .../common/statictopic/TopicQueueMappingTest.java  | 31 ++++-------
 .../tools/admin/DefaultMQAdminExtImpl.java         |  6 +-
 12 files changed, 110 insertions(+), 92 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9493c86..b1e98f7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -95,6 +95,7 @@ import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -1084,7 +1085,7 @@ public class BrokerController {
         Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
             .map(TopicConfig::getTopicName)
             .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
-                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.cloneAsMappingInfo()))
+                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
                 .orElse(null))
             .filter(Objects::nonNull)
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
@@ -1103,7 +1104,7 @@ public class BrokerController {
         topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
 
         topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
-                entry ->  new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().cloneAsMappingInfo())
+                entry ->  new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
         ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 443ae46..6fb9551 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -645,7 +645,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                     || !mappingDetail.getBname().equals(mappingItem.getBname())) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
-            ImmutableList<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
+            List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
             //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
             Long timestamp = requestHeader.getTimestamp();
             long offset = -1;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 24d3f07..7f704cc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -187,7 +187,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             //handle max offset
             {
                 if (mappingItem.checkIfEndOffsetDecided()) {
-                    responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
+                    responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
                 } else {
                     responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
                 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index d5b76b8..4b9d328 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -93,8 +94,8 @@ public class TopicQueueMappingManager extends ConfigManager {
                 throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
             }
             for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
-                ImmutableList<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
-                ImmutableList<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
+                List<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
+                List<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
                 if (newItems == null) {
                     //keep the old
                     newDetail.getHostedQueues().put(globalId, oldItems);
@@ -191,18 +192,18 @@ public class TopicQueueMappingManager extends ConfigManager {
             return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
         }
 
-        ImmutableList<LogicQueueMappingItem> mappingItemList = null;
+        List<LogicQueueMappingItem> mappingItemList = null;
         LogicQueueMappingItem mappingItem = null;
 
         if (globalOffset == null
                 || Long.MAX_VALUE == globalOffset) {
-            mappingItemList = mappingDetail.getMappingInfo(globalId);
+            mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
             if (mappingItemList != null
                 && mappingItemList.size() > 0) {
                 mappingItem = mappingItemList.get(mappingItemList.size() - 1);
             }
         } else {
-            mappingItemList = mappingDetail.getMappingInfo(globalId);
+            mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
             mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset);
         }
         return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index b87d2f1..16f41e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -1,16 +1,23 @@
 package org.apache.rocketmq.common.statictopic;
 
-public class LogicQueueMappingItem {
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-    private final int gen; // immutable
-    private final int queueId; //, immutable
-    private final String bname; //important, immutable
+public class LogicQueueMappingItem extends RemotingSerializable {
+
+    private int gen; // immutable
+    private int queueId; //, immutable
+    private String bname; //important, immutable
     private long logicOffset; // the start of the logic offset, important, can be changed by command only once
-    private final long startOffset; // the start of the physical offset, should always be 0, immutable
+    private long startOffset; // the start of the physical offset, should always be 0, immutable
     private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable
     private long timeOfStart = -1; // mutable, reserved
     private long timeOfEnd = -1; // mutable, reserved
 
+    //make sure it has a default constructor
+    public LogicQueueMappingItem() {
+
+    }
+
     public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
         this.gen = gen;
         this.queueId = queueId;
@@ -112,6 +119,22 @@ public class LogicQueueMappingItem {
         this.timeOfEnd = timeOfEnd;
     }
 
+    public void setGen(int gen) {
+        this.gen = gen;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public void setBname(String bname) {
+        this.bname = bname;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
     @Override
     public String toString() {
         return "LogicQueueMappingItem{" +
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index b639c6a..d6d359d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -18,15 +18,17 @@ package org.apache.rocketmq.common.statictopic;
 
 import com.google.common.collect.ImmutableList;
 
+import java.util.List;
+
 public class TopicQueueMappingContext  {
     private String topic;
     private Integer globalId;
     private Long globalOffset;
     private TopicQueueMappingDetail mappingDetail;
-    private ImmutableList<LogicQueueMappingItem> mappingItemList;
+    private List<LogicQueueMappingItem> mappingItemList;
     private LogicQueueMappingItem mappingItem;
 
-    public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
+    public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, List<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
         this.topic = topic;
         this.globalId = globalId;
         this.globalOffset = globalOffset;
@@ -73,7 +75,7 @@ public class TopicQueueMappingContext  {
         this.mappingDetail = mappingDetail;
     }
 
-    public ImmutableList<LogicQueueMappingItem> getMappingItemList() {
+    public List<LogicQueueMappingItem> getMappingItemList() {
         return mappingItemList;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index b80aa9d..4a8bae3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.common.statictopic;
 
-import com.google.common.collect.ImmutableList;
-
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,50 +25,45 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
     // the mapping info in current broker, do not register to nameserver
     // make sure this value is not null
-    private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
-
-
+    private ConcurrentMap<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, List<LogicQueueMappingItem>>();
 
+    //make sure there is a default constructor
     public TopicQueueMappingDetail() {
 
     }
 
-
     public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
         super(topic, totalQueues, bname, epoch);
-        buildIdMap();
     }
 
-    public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
+    public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
         if (mappingInfo.isEmpty()) {
             return true;
         }
-        hostedQueues.put(globalId, mappingInfo);
-        buildIdMap();
+        mappingDetail.hostedQueues.put(globalId, mappingInfo);
         return true;
     }
 
-    public void buildIdMap() {
-        this.currIdMap = buildIdMap(LEVEL_0);
+    public static List<LogicQueueMappingItem> getMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId) {
+        return mappingDetail.hostedQueues.get(globalId);
     }
 
-
-    public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
+    public static ConcurrentMap<Integer, Integer> buildIdMap(TopicQueueMappingDetail mappingDetail, int level) {
         //level 0 means current leader in this broker
         //level 1 means previous leader in this broker, reserved for
         assert level == LEVEL_0 ;
 
-        if (hostedQueues == null || hostedQueues.isEmpty()) {
+        if (mappingDetail.hostedQueues == null || mappingDetail.hostedQueues.isEmpty()) {
             return new ConcurrentHashMap<Integer, Integer>();
         }
         ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
-        for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
+        for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry: mappingDetail.hostedQueues.entrySet()) {
             Integer globalId =  entry.getKey();
-            ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+            List<LogicQueueMappingItem> items = entry.getValue();
             if (level == LEVEL_0
                     && items.size() >= 1) {
                 LogicQueueMappingItem curr = items.get(items.size() - 1);
-                if (bname.equals(curr.getBname())) {
+                if (mappingDetail.bname.equals(curr.getBname())) {
                     tmpIdMap.put(globalId, curr.getQueueId());
                 }
             }
@@ -78,14 +71,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         return tmpIdMap;
     }
 
-    public ImmutableList<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
-        return hostedQueues.get(globalId);
-    }
-
-
 
-
-    public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList<LogicQueueMappingItem> mappingItems, long logicOffset) {
+    public static LogicQueueMappingItem findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long logicOffset) {
         if (mappingItems == null
                 || mappingItems.isEmpty()) {
             return null;
@@ -106,8 +93,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         return null;
     }
 
-    public long computeMaxOffsetFromMapping(Integer globalId) {
-        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+    public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail mappingDetail, Integer globalId) {
+        List<LogicQueueMappingItem> mappingItems = getMappingInfo(mappingDetail, globalId);
         if (mappingItems == null
                 || mappingItems.isEmpty()) {
             return -1;
@@ -117,24 +104,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     }
 
 
-    public TopicQueueMappingInfo cloneAsMappingInfo() {
-        TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
-        topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
+    public static TopicQueueMappingInfo cloneAsMappingInfo(TopicQueueMappingDetail mappingDetail) {
+        TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(mappingDetail.topic, mappingDetail.totalQueues, mappingDetail.bname, mappingDetail.epoch);
+        topicQueueMappingInfo.currIdMap = TopicQueueMappingDetail.buildIdMap(mappingDetail, LEVEL_0);
         return topicQueueMappingInfo;
     }
 
-    public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
-        return hostedQueues;
+    public static boolean checkIfAsPhysical(TopicQueueMappingDetail mappingDetail, Integer globalId) {
+        List<LogicQueueMappingItem> mappingItems = getMappingInfo(mappingDetail, globalId);
+        return mappingItems == null
+                || (mappingItems.size() == 1
+                &&  mappingItems.get(0).getLogicOffset() == 0);
     }
 
-    public void setHostedQueues(ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> hostedQueues) {
-        this.hostedQueues = hostedQueues;
+    public ConcurrentMap<Integer, List<LogicQueueMappingItem>> getHostedQueues() {
+        return hostedQueues;
     }
 
-    public boolean checkIfAsPhysical(Integer globalId) {
-        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
-        return mappingItems == null
-                || (mappingItems.size() == 1
-                &&  mappingItems.get(0).getLogicOffset() == 0);
+    public void setHostedQueues(ConcurrentMap<Integer, List<LogicQueueMappingItem>> hostedQueues) {
+        this.hostedQueues = hostedQueues;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 39747b3..ba5af9b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -30,7 +30,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     long epoch; //important to fence the old dirty data
     boolean dirty; //indicate if the data is dirty
     //register to broker to construct the route
-    transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+    protected ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
 
     public TopicQueueMappingInfo() {
 
@@ -80,4 +80,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     public ConcurrentMap<Integer, Integer> getCurrIdMap() {
         return currIdMap;
     }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setBname(String bname) {
+        this.bname = bname;
+    }
+
+    public void setCurrIdMap(ConcurrentMap<Integer, Integer> currIdMap) {
+        this.currIdMap = currIdMap;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
index 644b335..d802575 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
@@ -19,14 +19,16 @@ package org.apache.rocketmq.common.statictopic;
 import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
+import java.util.List;
+
 public class TopicQueueMappingOne extends RemotingSerializable {
 
     String topic; // redundant field
     String bname;  //identify the hosted broker name
     Integer globalId;
-    ImmutableList<LogicQueueMappingItem> items;
+    List<LogicQueueMappingItem> items;
 
-    public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList<LogicQueueMappingItem> items) {
+    public TopicQueueMappingOne(String topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) {
         this.topic = topic;
         this.bname = bname;
         this.globalId = globalId;
@@ -45,7 +47,7 @@ public class TopicQueueMappingOne extends RemotingSerializable {
         return globalId;
     }
 
-    public ImmutableList<LogicQueueMappingItem> getItems() {
+    public List<LogicQueueMappingItem> getItems() {
         return items;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e83ed2a..6e6b15b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class TopicQueueMappingUtils {
 
@@ -168,7 +167,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
-    public static void makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
+    public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems) {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
         }
@@ -198,7 +197,7 @@ public class TopicQueueMappingUtils {
     }
 
 
-    public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> items) {
+    public static void checkLogicQueueMappingItemOffset(List<LogicQueueMappingItem> items) {
         if (items == null
             || items.isEmpty()) {
             return;
@@ -248,7 +247,7 @@ public class TopicQueueMappingUtils {
             if (mappingDetail.totalQueues > maxNum) {
                 maxNum = mappingDetail.totalQueues;
             }
-            for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  entry : mappingDetail.getHostedQueues().entrySet()) {
+            for (Map.Entry<Integer, List<LogicQueueMappingItem>>  entry : mappingDetail.getHostedQueues().entrySet()) {
                 Integer globalid = entry.getKey();
                 checkLogicQueueMappingItemOffset(entry.getValue());
                 String leaderBrokerName  = getLeaderBroker(entry.getValue());
@@ -278,10 +277,10 @@ public class TopicQueueMappingUtils {
         return globalIdMap;
     }
 
-    public static String getLeaderBroker(ImmutableList<LogicQueueMappingItem> items) {
+    public static String getLeaderBroker(List<LogicQueueMappingItem> items) {
         return getLeaderItem(items).getBname();
     }
-    public static LogicQueueMappingItem getLeaderItem(ImmutableList<LogicQueueMappingItem> items) {
+    public static LogicQueueMappingItem getLeaderItem(List<LogicQueueMappingItem> items) {
         assert items.size() > 0;
         return items.get(items.size() - 1);
     }
@@ -367,7 +366,7 @@ public class TopicQueueMappingUtils {
                 configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
             }
             LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
-            configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
+            TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem));
         }
 
         // set the topic config
@@ -458,8 +457,8 @@ public class TopicQueueMappingUtils {
 
             ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
             //Use the same object
-            mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
-            mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
+            TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems);
+            TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems);
         }
 
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index b0cc5dd..d571f65 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -8,18 +8,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Map;
 
 public class TopicQueueMappingTest {
 
     @Test
-    public void testWriteToFile() {
-        System.out.println(System.getProperty("java.io.tmpdir"));
-        System.out.println(File.separator);
-    }
-
-    @Test
     public void testJsonSerialize() {
         LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
         String mappingItemJson = JSON.toJSONString(mappingItem) ;
@@ -29,35 +22,33 @@ public class TopicQueueMappingTest {
             Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
             Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
             Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
-            Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
             Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
             Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
             Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
             Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
+
         }
+        //test the decode encode
         {
-            String mappingItemJson2 = RemotingSerializable.toJson(RemotingSerializable.decode(mappingItemJson.getBytes(), LogicQueueMappingItem.class), false);
-            Assert.assertEquals(mappingItemJson, mappingItemJson2);
+            LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class);
+            Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false));
         }
         TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
-        mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
+        TopicQueueMappingDetail.putMappingInfo(mappingDetail, 0, ImmutableList.of(mappingItem));
 
         String mappingDetailJson = JSON.toJSONString(mappingDetail);
         {
             Map  mappingDetailMap = JSON.parseObject(mappingDetailJson);
-            Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
-            Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
-            Assert.assertEquals(6, mappingDetailMap.size());
+            Assert.assertTrue(mappingDetailMap.containsKey("currIdMap"));
+            Assert.assertEquals(7, mappingDetailMap.size());
             Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
             Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
         }
         {
-            System.out.println(mappingDetailJson);
-            TopicQueueMappingDetail detailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
-            System.out.println(JSON.toJSONString(detailFromJson));
-
-            //Assert.assertEquals(1, detailFromJson.getHostedQueues().size());
-            //Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size());
+            TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
+            Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size());
+            Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size());
+            Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
         }
     }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 3c491cd..d375b13 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1137,8 +1137,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicStatsTable statsTable = examineTopicStats(addr, topic);
             TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
-            for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
-                ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+            for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+                List<LogicQueueMappingItem> items = entry.getValue();
                 Integer globalId = entry.getKey();
                 if (items.size() < 2) {
                     continue;
@@ -1159,7 +1159,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
                 newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
                 TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
                 //fresh the new leader
-                mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+                TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
             }
         }
         //Step4: write to the new leader with logic offset