You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/17 15:29:08 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Use timestamp as the epoch to prevent some unknown problem

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new c673cb7  Use timestamp as the epoch to prevent some unknown problem
c673cb7 is described below

commit c673cb734eb6bf81b3c446f6b3816bb354091ace
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 17 23:28:14 2021 +0800

    Use timestamp as the epoch to prevent some unknown problem
---
 .../java/org/apache/rocketmq/common/TopicQueueMappingDetail.java  | 2 +-
 .../java/org/apache/rocketmq/common/TopicQueueMappingInfo.java    | 8 ++++----
 .../java/org/apache/rocketmq/common/TopicQueueMappingUtils.java   | 8 ++++----
 .../protocol/body/TopicConfigAndMappingSerializeWrapper.java      | 2 +-
 .../rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java | 7 ++++---
 5 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 9b67751..188bfcc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -29,7 +29,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     // make sure this value is not null
     private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
 
-    public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int epoch) {
+    public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
         super(topic, totalQueues, bname, epoch);
         buildIdMap();
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index b4bf776..f8c803c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -28,14 +28,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
     String topic; // redundant field
     int totalQueues;
     String bname;  //identify the hosted broker name
-    int epoch; //important to fence the old dirty data
+    long epoch; //important to fence the old dirty data
     boolean dirty; //indicate if the data is dirty
     //register to broker to construct the route
     transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
     //register to broker to help detect remapping failure
     transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
 
-    public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int epoch) {
+    public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
         this.topic = topic;
         this.totalQueues = totalQueues;
         this.bname = bname;
@@ -64,11 +64,11 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
         return topic;
     }
 
-    public int getEpoch() {
+    public long getEpoch() {
         return epoch;
     }
 
-    public void setEpoch(int epoch) {
+    public void setEpoch(long epoch) {
         this.epoch = epoch;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 87c54a3..686208a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -89,8 +89,8 @@ public class TopicQueueMappingUtils {
         return new MappingAllocator(idToBroker, brokerNumMap);
     }
 
-    public static Map.Entry<Integer, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
-        int epoch = -1;
+    public static Map.Entry<Long, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
+        long epoch = -1;
         int queueNum = 0;
         for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
             if (mappingDetail.getEpoch() > epoch) {
@@ -100,14 +100,14 @@ public class TopicQueueMappingUtils {
                 queueNum = mappingDetail.getTotalQueues();
             }
         }
-        return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch, queueNum);
+        return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
     }
 
     public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
         Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
             @Override
             public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
-                return o2.getEpoch() - o1.getEpoch();
+                return (int)(o2.getEpoch() - o1.getEpoch());
             }
         });
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
index e6a34c4..ba494aa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigAndMappingSerializeWrapper.java
@@ -35,7 +35,7 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW
 
     public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
         if (wrapper instanceof  TopicConfigAndMappingSerializeWrapper) {
-            return (TopicConfigAndMappingSerializeWrapper)wrapper;
+            return (TopicConfigAndMappingSerializeWrapper) wrapper;
         }
         TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper =  new TopicConfigAndMappingSerializeWrapper();
         mappingSerializeWrapper.setDataVersion(wrapper.getDataVersion());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a1d3f6f..5cee8c8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -134,7 +134,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 }
             }
 
-            Map.Entry<Integer, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(-1, queueNum);
+            Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
             if (!existedTopicConfigMap.isEmpty()) {
                 //make sure it it not null
                 existedTopicConfigMap.forEach((key, value) -> {
@@ -155,7 +155,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
                 //check the epoch and qnum
                 maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-                final Map.Entry<Integer, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
+                final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
                 detailList.forEach( mappingDetail -> {
                     if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
                         throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
@@ -200,7 +200,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
 
             //construct the topic configAndMapping
-            int epoch = maxEpochAndNum.getKey() + 1;
+            long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
             newIdToBroker.forEach( (queueId, broker) -> {
                 TopicConfigAndQueueMapping configMapping;
                 if (!existedTopicConfigMap.containsKey(broker)) {
@@ -218,6 +218,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
             });
 
+            //If some succeed, and others fail, it will cause inconsistent data
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
                 String broker = entry.getKey();
                 String addr = clientMetadata.findMasterBrokerAddr(broker);