You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/20 04:33:29 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the lock and persist for updateTopicQueueMapping

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new cf8b846  Polish the lock and persist for updateTopicQueueMapping
cf8b846 is described below

commit cf8b84659fbf5ca22c31fa336a615843f98fde01
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 20 12:33:15 2021 +0800

    Polish the lock and persist for updateTopicQueueMapping
---
 .../broker/processor/AdminBrokerProcessor.java     |  2 +-
 .../broker/topic/TopicQueueMappingManager.java     | 23 ++++++++++++++++++----
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index bab484d..9048584 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
             response.setCode(ResponseCode.SUCCESS);
         } catch (Exception e) {
-            log.error("Update static failed for [{}]", request, e);
+            log.error("Update static topic failed for [{}]", request, e);
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(e.getMessage());
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index e2b46fe..d5b76b8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager {
         this.brokerController = brokerController;
     }
 
-    public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) {
-        lock.lock();
+    public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
+        boolean locked = false;
+        boolean updated = false;
         try {
+            if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                locked = true;
+            } else {
+                return;
+            }
             if (newDetail == null) {
                 return;
             }
@@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
             TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
             if (oldDetail == null) {
                 topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+                updated = true;
                 return;
             }
             if (force) {
@@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                     newDetail.getHostedQueues().putIfAbsent(queueId, items);
                 });
                 topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+                updated = true;
                 return;
             }
             //do more check
@@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager {
                 }
             }
             topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
-        } finally {
-            lock.unlock();
+            updated = true;
+        }  finally {
+            if (locked) {
+                this.lock.unlock();
+            }
+            if (updated) {
+                this.persist();
+            }
         }
 
     }