You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/24 07:33:54 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix the serialize probelm
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new a8ef92e Fix the serialize probelm
a8ef92e is described below
commit a8ef92e94dccbdbee52c86ae8a57620e1783d623
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 24 15:33:37 2021 +0800
Fix the serialize probelm
---
.../apache/rocketmq/broker/BrokerController.java | 5 +-
.../broker/processor/AdminBrokerProcessor.java | 2 +-
.../broker/processor/PullMessageProcessor.java | 2 +-
.../broker/topic/TopicQueueMappingManager.java | 11 ++--
.../common/statictopic/LogicQueueMappingItem.java | 33 +++++++++--
.../statictopic/TopicQueueMappingContext.java | 8 ++-
.../statictopic/TopicQueueMappingDetail.java | 65 +++++++++-------------
.../common/statictopic/TopicQueueMappingInfo.java | 14 ++++-
.../common/statictopic/TopicQueueMappingOne.java | 8 ++-
.../common/statictopic/TopicQueueMappingUtils.java | 17 +++---
.../common/statictopic/TopicQueueMappingTest.java | 31 ++++-------
.../tools/admin/DefaultMQAdminExtImpl.java | 6 +-
12 files changed, 110 insertions(+), 92 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9493c86..b1e98f7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -95,6 +95,7 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -1084,7 +1085,7 @@ public class BrokerController {
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
- .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.cloneAsMappingInfo()))
+ .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
@@ -1103,7 +1104,7 @@ public class BrokerController {
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
- entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().cloneAsMappingInfo())
+ entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 443ae46..6fb9551 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -645,7 +645,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
- ImmutableList<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
+ List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
//TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
Long timestamp = requestHeader.getTimestamp();
long offset = -1;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 24d3f07..7f704cc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -187,7 +187,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//handle max offset
{
if (mappingItem.checkIfEndOffsetDecided()) {
- responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
+ responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
} else {
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index d5b76b8..4b9d328 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -93,8 +94,8 @@ public class TopicQueueMappingManager extends ConfigManager {
throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
}
for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
- ImmutableList<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
- ImmutableList<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
+ List<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
+ List<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
if (newItems == null) {
//keep the old
newDetail.getHostedQueues().put(globalId, oldItems);
@@ -191,18 +192,18 @@ public class TopicQueueMappingManager extends ConfigManager {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
}
- ImmutableList<LogicQueueMappingItem> mappingItemList = null;
+ List<LogicQueueMappingItem> mappingItemList = null;
LogicQueueMappingItem mappingItem = null;
if (globalOffset == null
|| Long.MAX_VALUE == globalOffset) {
- mappingItemList = mappingDetail.getMappingInfo(globalId);
+ mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
if (mappingItemList != null
&& mappingItemList.size() > 0) {
mappingItem = mappingItemList.get(mappingItemList.size() - 1);
}
} else {
- mappingItemList = mappingDetail.getMappingInfo(globalId);
+ mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset);
}
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index b87d2f1..16f41e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -1,16 +1,23 @@
package org.apache.rocketmq.common.statictopic;
-public class LogicQueueMappingItem {
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
- private final int gen; // immutable
- private final int queueId; //, immutable
- private final String bname; //important, immutable
+public class LogicQueueMappingItem extends RemotingSerializable {
+
+ private int gen; // immutable
+ private int queueId; //, immutable
+ private String bname; //important, immutable
private long logicOffset; // the start of the logic offset, important, can be changed by command only once
- private final long startOffset; // the start of the physical offset, should always be 0, immutable
+ private long startOffset; // the start of the physical offset, should always be 0, immutable
private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable
private long timeOfStart = -1; // mutable, reserved
private long timeOfEnd = -1; // mutable, reserved
+ //make sure it has a default constructor
+ public LogicQueueMappingItem() {
+
+ }
+
public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
this.gen = gen;
this.queueId = queueId;
@@ -112,6 +119,22 @@ public class LogicQueueMappingItem {
this.timeOfEnd = timeOfEnd;
}
+ public void setGen(int gen) {
+ this.gen = gen;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public void setBname(String bname) {
+ this.bname = bname;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
@Override
public String toString() {
return "LogicQueueMappingItem{" +
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index b639c6a..d6d359d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -18,15 +18,17 @@ package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
+import java.util.List;
+
public class TopicQueueMappingContext {
private String topic;
private Integer globalId;
private Long globalOffset;
private TopicQueueMappingDetail mappingDetail;
- private ImmutableList<LogicQueueMappingItem> mappingItemList;
+ private List<LogicQueueMappingItem> mappingItemList;
private LogicQueueMappingItem mappingItem;
- public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
+ public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, List<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
this.topic = topic;
this.globalId = globalId;
this.globalOffset = globalOffset;
@@ -73,7 +75,7 @@ public class TopicQueueMappingContext {
this.mappingDetail = mappingDetail;
}
- public ImmutableList<LogicQueueMappingItem> getMappingItemList() {
+ public List<LogicQueueMappingItem> getMappingItemList() {
return mappingItemList;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index b80aa9d..4a8bae3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.common.statictopic;
-import com.google.common.collect.ImmutableList;
-
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,50 +25,45 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
// make sure this value is not null
- private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
-
-
+ private ConcurrentMap<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, List<LogicQueueMappingItem>>();
+ //make sure there is a default constructor
public TopicQueueMappingDetail() {
}
-
public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
super(topic, totalQueues, bname, epoch);
- buildIdMap();
}
- public boolean putMappingInfo(Integer globalId, ImmutableList<LogicQueueMappingItem> mappingInfo) {
+ public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
}
- hostedQueues.put(globalId, mappingInfo);
- buildIdMap();
+ mappingDetail.hostedQueues.put(globalId, mappingInfo);
return true;
}
- public void buildIdMap() {
- this.currIdMap = buildIdMap(LEVEL_0);
+ public static List<LogicQueueMappingItem> getMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId) {
+ return mappingDetail.hostedQueues.get(globalId);
}
-
- public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
+ public static ConcurrentMap<Integer, Integer> buildIdMap(TopicQueueMappingDetail mappingDetail, int level) {
//level 0 means current leader in this broker
//level 1 means previous leader in this broker, reserved for
assert level == LEVEL_0 ;
- if (hostedQueues == null || hostedQueues.isEmpty()) {
+ if (mappingDetail.hostedQueues == null || mappingDetail.hostedQueues.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
}
ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
- for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry: hostedQueues.entrySet()) {
+ for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry: mappingDetail.hostedQueues.entrySet()) {
Integer globalId = entry.getKey();
- ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+ List<LogicQueueMappingItem> items = entry.getValue();
if (level == LEVEL_0
&& items.size() >= 1) {
LogicQueueMappingItem curr = items.get(items.size() - 1);
- if (bname.equals(curr.getBname())) {
+ if (mappingDetail.bname.equals(curr.getBname())) {
tmpIdMap.put(globalId, curr.getQueueId());
}
}
@@ -78,14 +71,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return tmpIdMap;
}
- public ImmutableList<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
- return hostedQueues.get(globalId);
- }
-
-
-
- public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList<LogicQueueMappingItem> mappingItems, long logicOffset) {
+ public static LogicQueueMappingItem findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long logicOffset) {
if (mappingItems == null
|| mappingItems.isEmpty()) {
return null;
@@ -106,8 +93,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return null;
}
- public long computeMaxOffsetFromMapping(Integer globalId) {
- List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+ public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail mappingDetail, Integer globalId) {
+ List<LogicQueueMappingItem> mappingItems = getMappingInfo(mappingDetail, globalId);
if (mappingItems == null
|| mappingItems.isEmpty()) {
return -1;
@@ -117,24 +104,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
}
- public TopicQueueMappingInfo cloneAsMappingInfo() {
- TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
- topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
+ public static TopicQueueMappingInfo cloneAsMappingInfo(TopicQueueMappingDetail mappingDetail) {
+ TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(mappingDetail.topic, mappingDetail.totalQueues, mappingDetail.bname, mappingDetail.epoch);
+ topicQueueMappingInfo.currIdMap = TopicQueueMappingDetail.buildIdMap(mappingDetail, LEVEL_0);
return topicQueueMappingInfo;
}
- public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
- return hostedQueues;
+ public static boolean checkIfAsPhysical(TopicQueueMappingDetail mappingDetail, Integer globalId) {
+ List<LogicQueueMappingItem> mappingItems = getMappingInfo(mappingDetail, globalId);
+ return mappingItems == null
+ || (mappingItems.size() == 1
+ && mappingItems.get(0).getLogicOffset() == 0);
}
- public void setHostedQueues(ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> hostedQueues) {
- this.hostedQueues = hostedQueues;
+ public ConcurrentMap<Integer, List<LogicQueueMappingItem>> getHostedQueues() {
+ return hostedQueues;
}
- public boolean checkIfAsPhysical(Integer globalId) {
- List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
- return mappingItems == null
- || (mappingItems.size() == 1
- && mappingItems.get(0).getLogicOffset() == 0);
+ public void setHostedQueues(ConcurrentMap<Integer, List<LogicQueueMappingItem>> hostedQueues) {
+ this.hostedQueues = hostedQueues;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index 39747b3..ba5af9b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -30,7 +30,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
long epoch; //important to fence the old dirty data
boolean dirty; //indicate if the data is dirty
//register to broker to construct the route
- transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
+ protected ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo() {
@@ -80,4 +80,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap;
}
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void setBname(String bname) {
+ this.bname = bname;
+ }
+
+ public void setCurrIdMap(ConcurrentMap<Integer, Integer> currIdMap) {
+ this.currIdMap = currIdMap;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
index 644b335..d802575 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
@@ -19,14 +19,16 @@ package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import java.util.List;
+
public class TopicQueueMappingOne extends RemotingSerializable {
String topic; // redundant field
String bname; //identify the hosted broker name
Integer globalId;
- ImmutableList<LogicQueueMappingItem> items;
+ List<LogicQueueMappingItem> items;
- public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList<LogicQueueMappingItem> items) {
+ public TopicQueueMappingOne(String topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) {
this.topic = topic;
this.bname = bname;
this.globalId = globalId;
@@ -45,7 +47,7 @@ public class TopicQueueMappingOne extends RemotingSerializable {
return globalId;
}
- public ImmutableList<LogicQueueMappingItem> getItems() {
+ public List<LogicQueueMappingItem> getItems() {
return items;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e83ed2a..6e6b15b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
-import java.util.stream.Collectors;
public class TopicQueueMappingUtils {
@@ -168,7 +167,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
- public static void makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
+ public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems) {
if (oldItems == null || oldItems.isEmpty()) {
return;
}
@@ -198,7 +197,7 @@ public class TopicQueueMappingUtils {
}
- public static void checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> items) {
+ public static void checkLogicQueueMappingItemOffset(List<LogicQueueMappingItem> items) {
if (items == null
|| items.isEmpty()) {
return;
@@ -248,7 +247,7 @@ public class TopicQueueMappingUtils {
if (mappingDetail.totalQueues > maxNum) {
maxNum = mappingDetail.totalQueues;
}
- for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
+ for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
checkLogicQueueMappingItemOffset(entry.getValue());
String leaderBrokerName = getLeaderBroker(entry.getValue());
@@ -278,10 +277,10 @@ public class TopicQueueMappingUtils {
return globalIdMap;
}
- public static String getLeaderBroker(ImmutableList<LogicQueueMappingItem> items) {
+ public static String getLeaderBroker(List<LogicQueueMappingItem> items) {
return getLeaderItem(items).getBname();
}
- public static LogicQueueMappingItem getLeaderItem(ImmutableList<LogicQueueMappingItem> items) {
+ public static LogicQueueMappingItem getLeaderItem(List<LogicQueueMappingItem> items) {
assert items.size() > 0;
return items.get(items.size() - 1);
}
@@ -367,7 +366,7 @@ public class TopicQueueMappingUtils {
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
- configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
+ TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem));
}
// set the topic config
@@ -458,8 +457,8 @@ public class TopicQueueMappingUtils {
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
- mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
- mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
+ TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems);
+ TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems);
}
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index b0cc5dd..d571f65 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -8,18 +8,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
import java.util.Map;
public class TopicQueueMappingTest {
@Test
- public void testWriteToFile() {
- System.out.println(System.getProperty("java.io.tmpdir"));
- System.out.println(File.separator);
- }
-
- @Test
public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
String mappingItemJson = JSON.toJSONString(mappingItem) ;
@@ -29,35 +22,33 @@ public class TopicQueueMappingTest {
Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
- Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
+
}
+ //test the decode encode
{
- String mappingItemJson2 = RemotingSerializable.toJson(RemotingSerializable.decode(mappingItemJson.getBytes(), LogicQueueMappingItem.class), false);
- Assert.assertEquals(mappingItemJson, mappingItemJson2);
+ LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class);
+ Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false));
}
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
- mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
+ TopicQueueMappingDetail.putMappingInfo(mappingDetail, 0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail);
{
Map mappingDetailMap = JSON.parseObject(mappingDetailJson);
- Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
- Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
- Assert.assertEquals(6, mappingDetailMap.size());
+ Assert.assertTrue(mappingDetailMap.containsKey("currIdMap"));
+ Assert.assertEquals(7, mappingDetailMap.size());
Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
}
{
- System.out.println(mappingDetailJson);
- TopicQueueMappingDetail detailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
- System.out.println(JSON.toJSONString(detailFromJson));
-
- //Assert.assertEquals(1, detailFromJson.getHostedQueues().size());
- //Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size());
+ TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
+ Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size());
+ Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size());
+ Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
}
}
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 3c491cd..d375b13 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1137,8 +1137,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
- for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
- ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+ for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+ List<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
@@ -1159,7 +1159,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
- mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+ TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
}
}
//Step4: write to the new leader with logic offset