You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/09 10:21:43 UTC
[inlong] branch master updated: [INLONG-6110][TubeMQ] Optimize the implementation logic of adding or modifying TopicCtrlEntity records (#6111)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d5345d01c [INLONG-6110][TubeMQ] Optimize the implementation logic of adding or modifying TopicCtrlEntity records (#6111)
d5345d01c is described below
commit d5345d01cf28d83a7d68d365c6a4787d80415b49
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Oct 9 18:21:39 2022 +0800
[INLONG-6110][TubeMQ] Optimize the implementation logic of adding or modifying TopicCtrlEntity records (#6111)
---
.../master/metamanage/DefaultMetaDataService.java | 109 +++++++++++++++++----
.../server/master/metamanage/MetaDataService.java | 23 ++---
.../metastore/dao/mapper/MetaConfigMapper.java | 8 ++
.../metastore/dao/mapper/TopicDeployMapper.java | 2 +
.../metastore/impl/AbsMetaConfigMapperImpl.java | 5 +
.../metastore/impl/AbsTopicDeployMapperImpl.java | 24 +++++
.../master/web/handler/WebTopicCtrlHandler.java | 22 ++---
7 files changed, 149 insertions(+), 44 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index 273604b5c..d26d32810 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -233,7 +233,7 @@ public class DefaultMetaDataService implements MetaDataService {
if (!disableCsmTopicSet.isEmpty()) {
result.setFailResult(TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
strBuff.append("[unAuthorized Group] ").append(consumerId)
- .append("'s consumerGroup not authorized by administrator, unAuthorizedTopics : ")
+ .append("'s consumerGroup not authorized by administrator, unAuthorizedTopics: ")
.append(disableCsmTopicSet).toString());
strBuff.delete(0, strBuff.length());
return result.isSuccess();
@@ -432,8 +432,9 @@ public class DefaultMetaDataService implements MetaDataService {
// query the operated object
BrokerConfEntity curEntry = metaConfigMapper.getBrokerConfByBrokerId(brokerId);
if (curEntry == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- "The broker configure not exist!");
+ result.setFullInfo(true,
+ DataOpErrCode.DERR_SUCCESS.getCode(),
+ DataOpErrCode.DERR_SUCCESS.getDescription());
return new BrokerProcessResult(brokerId, "", result);
}
// check broker's manage status
@@ -501,8 +502,48 @@ public class DefaultMetaDataService implements MetaDataService {
return result.isSuccess();
}
runStatusInfo.notifyDataChanged();
- strBuff.append("[Meta data] triggered broker syncStatus info is ");
- logger.info(runStatusInfo.toJsonString(strBuff).toString());
+ logger.info(strBuff.append("[Meta data] trigger broker syncStatus info, brokerId is ")
+ .append(brokerId).toString());
+ strBuff.delete(0, strBuff.length());
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+
+ /**
+ * Reload topic's deploy config info
+ *
+ * @param topicNameSet the topic name set
+ * @param strBuff the string buffer
+ * @param result the process return result
+ * @return true if success otherwise false
+ */
+ private boolean triggerBrokerConfDataSync(Set<String> topicNameSet,
+ StringBuilder strBuff,
+ ProcessResult result) {
+ if (!metaConfigMapper.checkStoreStatus(true, result)) {
+ return result.isSuccess();
+ }
+ Set<Integer> brokerIdSet =
+ metaConfigMapper.getDeployedBrokerIdByTopic(topicNameSet);
+ if (brokerIdSet.isEmpty()) {
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+ BrokerRunStatusInfo runStatusInfo;
+ BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+ for (Integer brokerId : brokerIdSet) {
+ if (brokerId == null) {
+ continue;
+ }
+ runStatusInfo = brokerRunManager.getBrokerRunStatusInfo(brokerId);
+ if (runStatusInfo == null) {
+ continue;
+ }
+ runStatusInfo.notifyDataChanged();
+ }
+ logger.info(strBuff.append("[Meta data] trigger broker syncStatus info for")
+ .append(" maxMsgSize modify, brokerId set is ").append(brokerIdSet)
+ .toString());
strBuff.delete(0, strBuff.length());
result.setSuccResult(null);
return result.isSuccess();
@@ -810,26 +851,56 @@ public class DefaultMetaDataService implements MetaDataService {
}
@Override
- public TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
- String topicName, int topicNameId,
- Boolean enableTopicAuth, int maxMsgSizeInMB,
- StringBuilder strBuff, ProcessResult result) {
- TopicCtrlEntity entity =
- new TopicCtrlEntity(opEntity, topicName);
- entity.updModifyInfo(opEntity.getDataVerId(),
- topicNameId, maxMsgSizeInMB, enableTopicAuth);
- return addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
+ public List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ Set<String> topicNameSet, int topicNameId,
+ Boolean enableTopicAuth, int maxMsgSizeInMB,
+ StringBuilder strBuff, ProcessResult result) {
+ TopicCtrlEntity entity;
+ Map<String, TopicCtrlEntity> topicCtrlEntityMap = new HashMap<>();
+ for (String topicName : topicNameSet) {
+ entity = new TopicCtrlEntity(opEntity, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ topicNameId, maxMsgSizeInMB, enableTopicAuth);
+ topicCtrlEntityMap.put(topicName, entity);
+ }
+ return addOrUpdTopicCtrlConf(isAddOp, topicCtrlEntityMap, strBuff, result);
}
@Override
- public TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
- StringBuilder strBuff, ProcessResult result) {
+ public List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp,
+ Map<String, TopicCtrlEntity> entityMap,
+ StringBuilder strBuff, ProcessResult result) {
+ List<TopicProcessResult> retInfo = new ArrayList<>();
// check current status
if (!metaConfigMapper.checkStoreStatus(true, result)) {
- return new TopicProcessResult(0, entity.getTopicName(), result);
+ for (String topicName : entityMap.keySet()) {
+ retInfo.add(new TopicProcessResult(0, topicName, result));
+ }
+ return retInfo;
}
- metaConfigMapper.addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
- return new TopicProcessResult(0, entity.getTopicName(), result);
+ if (isAddOp) {
+ for (TopicCtrlEntity entity : entityMap.values()) {
+ metaConfigMapper.addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
+ retInfo.add(new TopicProcessResult(0, entity.getTopicName(), result));
+ }
+ } else {
+ TopicCtrlEntity curEntity;
+ Set<String> changedTopicSet = new HashSet<>();
+ for (TopicCtrlEntity entity : entityMap.values()) {
+ curEntity = metaConfigMapper.getTopicCtrlByTopicName(entity.getTopicName());
+ if (curEntity != null) {
+ if (curEntity.getMaxMsgSizeInB() != entity.getMaxMsgSizeInB()) {
+ changedTopicSet.add(entity.getTopicName());
+ }
+ }
+ metaConfigMapper.addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
+ retInfo.add(new TopicProcessResult(0, entity.getTopicName(), result));
+ }
+ if (!changedTopicSet.isEmpty()) {
+ triggerBrokerConfDataSync(changedTopicSet, strBuff, result);
+ }
+ }
+ return retInfo;
}
@Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index 95f94270b..6370cb98f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -404,7 +404,7 @@ public interface MetaDataService extends Server {
*
* @param isAddOp whether add operation
* @param opEntity operator information
- * @param topicName topic name
+ * @param topicNameSet topic name set
* @param topicNameId the topic name id
* @param enableTopicAuth whether enable topic authentication
* @param maxMsgSizeInMB the max message size in MB
@@ -412,22 +412,23 @@ public interface MetaDataService extends Server {
* @param result the process result return
* @return true if success otherwise false
*/
- TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
- String topicName, int topicNameId,
- Boolean enableTopicAuth, int maxMsgSizeInMB,
- StringBuilder strBuff, ProcessResult result);
+ List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ Set<String> topicNameSet, int topicNameId,
+ Boolean enableTopicAuth, int maxMsgSizeInMB,
+ StringBuilder strBuff, ProcessResult result);
/**
* Add or Update topic control configure info
*
- * @param isAddOp whether add operation
- * @param entity the topic control info entity will be add
- * @param strBuff the print info string buffer
- * @param result the process result return
+ * @param isAddOp whether add operation
+ * @param entityMap the topic control entity map which will be add
+ * @param strBuff the print info string buffer
+ * @param result the process result return
* @return true if success otherwise false
*/
- TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
- StringBuilder strBuff, ProcessResult result);
+ List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp,
+ Map<String, TopicCtrlEntity> entityMap,
+ StringBuilder strBuff, ProcessResult result);
/**
* Insert topic control configure info
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index 8acfb75d9..ccfc05fc4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -356,6 +356,14 @@ public interface MetaConfigMapper extends KeepAliveService {
*/
Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);
+ /**
+ * Get deployed broker id for the special topic name set
+ *
+ * @param topicNameSet the topic name set need to query
+ * @return the broker id set
+ */
+ Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet);
+
/**
* Get deployed topic set
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 26d782b9f..65368f831 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -118,6 +118,8 @@ public interface TopicDeployMapper extends AbstractMapper {
Map<String/* topicName */, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);
+ Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet);
+
Set<String> getDeployedTopicSet();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 9e1421508..965706770 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -868,6 +868,11 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
return topicDeployMapper.getTopicBrokerInfo(topicNameSet);
}
+ @Override
+ public Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet) {
+ return topicDeployMapper.getDeployedBrokerIdByTopic(topicNameSet);
+ }
+
@Override
public Set<String> getDeployedTopicSet() {
return topicDeployMapper.getDeployedTopicSet();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index 5ed2e5102..f9c627cb9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -446,6 +446,30 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
return retEntityMap;
}
+ @Override
+ public Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet) {
+ ConcurrentHashSet<String> keySet;
+ Set<Integer> retSet = new HashSet<>();
+ if (topicNameSet == null || topicNameSet.isEmpty()) {
+ return retSet;
+ }
+ for (String topicName : topicNameSet) {
+ if (topicName == null) {
+ continue;
+ }
+ keySet = topicName2RecordCache.get(topicName);
+ if (keySet != null) {
+ for (String key : keySet) {
+ TopicDeployEntity entry = topicDeployCache.get(key);
+ if (entry != null) {
+ retSet.add(entry.getBrokerId());
+ }
+ }
+ }
+ }
+ return retSet;
+ }
+
@Override
public Set<String> getDeployedTopicSet() {
return new HashSet<>(topicName2RecordCache.keySet());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index 7dd1a6ead..f1d9f7f90 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -206,7 +206,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
- BaseEntity opEntity = (BaseEntity) result.getRetData();
+ final BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
@@ -242,17 +242,15 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebFieldDef.MAXMSGSIZEINMB, false,
(isAddOp ? maxMsgSizeMB : TBaseConstants.META_VALUE_UNDEFINED),
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- maxMsgSizeMB, sBuffer, result)) {
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
return sBuffer;
}
maxMsgSizeMB = (int) result.getRetData();
// add or update records
- List<TopicProcessResult> retInfo = new ArrayList<>();
- for (String topicName : topicNameSet) {
- retInfo.add(defMetaDataService.addOrUpdTopicCtrlConf(isAddOp, opEntity,
- topicName, topicNameId, enableTopicAuth, maxMsgSizeMB, sBuffer, result));
- }
+ List<TopicProcessResult> retInfo =
+ defMetaDataService.addOrUpdTopicCtrlConf(isAddOp, opEntity,
+ topicNameSet, topicNameId, enableTopicAuth, maxMsgSizeMB, sBuffer, result);
return buildRetInfo(retInfo, sBuffer);
}
@@ -273,11 +271,8 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
}
Map<String, TopicCtrlEntity> addRecordMap =
(Map<String, TopicCtrlEntity>) result.getRetData();
- List<TopicProcessResult> retInfo = new ArrayList<>();
- for (TopicCtrlEntity topicCtrlInfo : addRecordMap.values()) {
- retInfo.add(defMetaDataService.addOrUpdTopicCtrlConf(
- isAddOp, topicCtrlInfo, sBuffer, result));
- }
+ List<TopicProcessResult> retInfo =
+ defMetaDataService.addOrUpdTopicCtrlConf(isAddOp, addRecordMap, sBuffer, result);
return buildRetInfo(retInfo, sBuffer);
}
@@ -315,7 +310,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebFieldDef.MAXMSGSIZEINMB, false,
(isAddOp ? defMaxMsgSizeMB : TBaseConstants.META_VALUE_UNDEFINED),
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- defMaxMsgSizeMB, sBuffer, result)) {
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, sBuffer, result)) {
return result.isSuccess();
}
final int itemMaxMsgSizeMB = (int) result.getRetData();
@@ -370,5 +365,4 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
return sBuffer;
}
-
}