You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/17 15:29:08 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Use timestamp as the epoch to prevent some unknown problem
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new c673cb7 Use timestamp as the epoch to prevent some unknown problem
c673cb7 is described below
commit c673cb734eb6bf81b3c446f6b3816bb354091ace
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 17 23:28:14 2021 +0800
Use timestamp as the epoch to prevent some unknown problem
---
.../java/org/apache/rocketmq/common/TopicQueueMappingDetail.java | 2 +-
.../java/org/apache/rocketmq/common/TopicQueueMappingInfo.java | 8 ++++----
.../java/org/apache/rocketmq/common/TopicQueueMappingUtils.java | 8 ++++----
.../protocol/body/TopicConfigAndMappingSerializeWrapper.java | 2 +-
.../rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java | 7 ++++---
5 files changed, 14 insertions(+), 13 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 9b67751..188bfcc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -29,7 +29,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
- public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) {
+ public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
super(topic, totalQueues, bname, epoch);
buildIdMap();
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index b4bf776..f8c803c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -28,14 +28,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
String topic; // redundant field
int totalQueues;
String bname; //identify the hosted broker name
- int epoch; //important to fence the old dirty data
+ long epoch; //important to fence the old dirty data
boolean dirty; //indicate if the data is dirty
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
- public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) {
+ public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
this.topic = topic;
this.totalQueues = totalQueues;
this.bname = bname;
@@ -64,11 +64,11 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return topic;
}
- public int getEpoch() {
+ public long getEpoch() {
return epoch;
}
- public void setEpoch(int epoch) {
+ public void setEpoch(long epoch) {
this.epoch = epoch;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 87c54a3..686208a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -89,8 +89,8 @@ public class TopicQueueMappingUtils {
return new MappingAllocator(idToBroker, brokerNumMap);
}
- public static Map.Entry<Integer, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
- int epoch = -1;
+ public static Map.Entry<Long, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
+ long epoch = -1;
int queueNum = 0;
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
if (mappingDetail.getEpoch() > epoch) {
@@ -100,14 +100,14 @@ public class TopicQueueMappingUtils {
queueNum = mappingDetail.getTotalQueues();
}
}
- return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch, queueNum);
+ return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
}
public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
- return o2.getEpoch() - o1.getEpoch();
+ return (int)(o2.getEpoch() - o1.getEpoch());
}
});
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
index e6a34c4..ba494aa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
@@ -35,7 +35,7 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW
public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
- return (TopicConfigAndMappingSerializeWrapper)wrapper;
+ return (TopicConfigAndMappingSerializeWrapper) wrapper;
}
TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a1d3f6f..5cee8c8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -134,7 +134,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
- Map.Entry<Integer, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(-1, queueNum);
+ Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!existedTopicConfigMap.isEmpty()) {
//make sure it it not null
existedTopicConfigMap.forEach((key, value) -> {
@@ -155,7 +155,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- final Map.Entry<Integer, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
+ final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
@@ -200,7 +200,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
- int epoch = maxEpochAndNum.getKey() + 1;
+ long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
newIdToBroker.forEach( (queueId, broker) -> {
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) {
@@ -218,6 +218,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
});
+ //If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);