You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/20 04:33:29 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the lock and persist for updateTopicQueueMapping
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new cf8b846 Polish the lock and persist for updateTopicQueueMapping
cf8b846 is described below
commit cf8b84659fbf5ca22c31fa336a615843f98fde01
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 20 12:33:15 2021 +0800
Polish the lock and persist for updateTopicQueueMapping
---
.../broker/processor/AdminBrokerProcessor.java | 2 +-
.../broker/topic/TopicQueueMappingManager.java | 23 ++++++++++++++++++----
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index bab484d..9048584 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -343,7 +343,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) {
- log.error("Update static failed for [{}]", request, e);
+ log.error("Update static topic failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index e2b46fe..d5b76b8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -57,9 +58,15 @@ public class TopicQueueMappingManager extends ConfigManager {
this.brokerController = brokerController;
}
- public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) {
- lock.lock();
+ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean force) throws Exception {
+ boolean locked = false;
+ boolean updated = false;
try {
+ if (lock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ locked = true;
+ } else {
+ return;
+ }
if (newDetail == null) {
return;
}
@@ -70,6 +77,7 @@ public class TopicQueueMappingManager extends ConfigManager {
TopicQueueMappingDetail oldDetail = topicQueueMappingTable.get(newDetail.getTopic());
if (oldDetail == null) {
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ updated = true;
return;
}
if (force) {
@@ -77,6 +85,7 @@ public class TopicQueueMappingManager extends ConfigManager {
newDetail.getHostedQueues().putIfAbsent(queueId, items);
});
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+ updated = true;
return;
}
//do more check
@@ -94,8 +103,14 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
- } finally {
- lock.unlock();
+ updated = true;
+ } finally {
+ if (locked) {
+ this.lock.unlock();
+ }
+ if (updated) {
+ this.persist();
+ }
}
}