You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/09 10:21:43 UTC

[inlong] branch master updated: [INLONG-6110][TubeMQ] Optimize the implementation logic of adding or modifying TopicCtrlEntity records (#6111)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d5345d01c [INLONG-6110][TubeMQ] Optimize the implementation logic of adding or modifying TopicCtrlEntity records (#6111)
d5345d01c is described below

commit d5345d01cf28d83a7d68d365c6a4787d80415b49
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Oct 9 18:21:39 2022 +0800

    [INLONG-6110][TubeMQ] Optimize the implementation logic of adding or modifying TopicCtrlEntity records (#6111)
---
 .../master/metamanage/DefaultMetaDataService.java  | 109 +++++++++++++++++----
 .../server/master/metamanage/MetaDataService.java  |  23 ++---
 .../metastore/dao/mapper/MetaConfigMapper.java     |   8 ++
 .../metastore/dao/mapper/TopicDeployMapper.java    |   2 +
 .../metastore/impl/AbsMetaConfigMapperImpl.java    |   5 +
 .../metastore/impl/AbsTopicDeployMapperImpl.java   |  24 +++++
 .../master/web/handler/WebTopicCtrlHandler.java    |  22 ++---
 7 files changed, 149 insertions(+), 44 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index 273604b5c..d26d32810 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -233,7 +233,7 @@ public class DefaultMetaDataService implements MetaDataService {
         if (!disableCsmTopicSet.isEmpty()) {
             result.setFailResult(TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
                     strBuff.append("[unAuthorized Group] ").append(consumerId)
-                            .append("'s consumerGroup not authorized by administrator, unAuthorizedTopics : ")
+                            .append("'s consumerGroup not authorized by administrator, unAuthorizedTopics: ")
                             .append(disableCsmTopicSet).toString());
             strBuff.delete(0, strBuff.length());
             return result.isSuccess();
@@ -432,8 +432,9 @@ public class DefaultMetaDataService implements MetaDataService {
         // query the operated object
         BrokerConfEntity curEntry = metaConfigMapper.getBrokerConfByBrokerId(brokerId);
         if (curEntry == null) {
-            result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
-                    "The broker configure not exist!");
+            result.setFullInfo(true,
+                    DataOpErrCode.DERR_SUCCESS.getCode(),
+                    DataOpErrCode.DERR_SUCCESS.getDescription());
             return new BrokerProcessResult(brokerId, "", result);
         }
         // check broker's manage status
@@ -501,8 +502,48 @@ public class DefaultMetaDataService implements MetaDataService {
             return result.isSuccess();
         }
         runStatusInfo.notifyDataChanged();
-        strBuff.append("[Meta data] triggered broker syncStatus info is ");
-        logger.info(runStatusInfo.toJsonString(strBuff).toString());
+        logger.info(strBuff.append("[Meta data] trigger broker syncStatus info, brokerId is ")
+                .append(brokerId).toString());
+        strBuff.delete(0, strBuff.length());
+        result.setSuccResult(null);
+        return result.isSuccess();
+    }
+
+    /**
+     * Reload topic's deploy config info
+     *
+     * @param topicNameSet  the topic name set
+     * @param strBuff       the string buffer
+     * @param result        the process return result
+     * @return true if success otherwise false
+     */
+    private boolean triggerBrokerConfDataSync(Set<String> topicNameSet,
+                                              StringBuilder strBuff,
+                                              ProcessResult result) {
+        if (!metaConfigMapper.checkStoreStatus(true, result)) {
+            return result.isSuccess();
+        }
+        Set<Integer> brokerIdSet =
+                metaConfigMapper.getDeployedBrokerIdByTopic(topicNameSet);
+        if (brokerIdSet.isEmpty()) {
+            result.setSuccResult();
+            return result.isSuccess();
+        }
+        BrokerRunStatusInfo runStatusInfo;
+        BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+        for (Integer brokerId : brokerIdSet) {
+            if (brokerId == null) {
+                continue;
+            }
+            runStatusInfo = brokerRunManager.getBrokerRunStatusInfo(brokerId);
+            if (runStatusInfo == null) {
+                continue;
+            }
+            runStatusInfo.notifyDataChanged();
+        }
+        logger.info(strBuff.append("[Meta data] trigger broker syncStatus info for")
+                .append(" maxMsgSize modify, brokerId set is ").append(brokerIdSet)
+                .toString());
         strBuff.delete(0, strBuff.length());
         result.setSuccResult(null);
         return result.isSuccess();
@@ -810,26 +851,56 @@ public class DefaultMetaDataService implements MetaDataService {
     }
 
     @Override
-    public TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
-                                                    String topicName, int topicNameId,
-                                                    Boolean enableTopicAuth, int maxMsgSizeInMB,
-                                                    StringBuilder strBuff, ProcessResult result) {
-        TopicCtrlEntity entity =
-                new TopicCtrlEntity(opEntity, topicName);
-        entity.updModifyInfo(opEntity.getDataVerId(),
-                topicNameId, maxMsgSizeInMB, enableTopicAuth);
-        return addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
+    public List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+                                                          Set<String> topicNameSet, int topicNameId,
+                                                          Boolean enableTopicAuth, int maxMsgSizeInMB,
+                                                          StringBuilder strBuff, ProcessResult result) {
+        TopicCtrlEntity entity;
+        Map<String, TopicCtrlEntity> topicCtrlEntityMap = new HashMap<>();
+        for (String topicName : topicNameSet) {
+            entity = new TopicCtrlEntity(opEntity, topicName);
+            entity.updModifyInfo(opEntity.getDataVerId(),
+                    topicNameId, maxMsgSizeInMB, enableTopicAuth);
+            topicCtrlEntityMap.put(topicName, entity);
+        }
+        return addOrUpdTopicCtrlConf(isAddOp, topicCtrlEntityMap, strBuff, result);
     }
 
     @Override
-    public TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
-                                                    StringBuilder strBuff, ProcessResult result) {
+    public List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp,
+                                                          Map<String, TopicCtrlEntity> entityMap,
+                                                          StringBuilder strBuff, ProcessResult result) {
+        List<TopicProcessResult> retInfo = new ArrayList<>();
         // check current status
         if (!metaConfigMapper.checkStoreStatus(true, result)) {
-            return new TopicProcessResult(0, entity.getTopicName(), result);
+            for (String topicName : entityMap.keySet()) {
+                retInfo.add(new TopicProcessResult(0, topicName, result));
+            }
+            return retInfo;
         }
-        metaConfigMapper.addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
-        return new TopicProcessResult(0, entity.getTopicName(), result);
+        if (isAddOp) {
+            for (TopicCtrlEntity entity : entityMap.values()) {
+                metaConfigMapper.addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
+                retInfo.add(new TopicProcessResult(0, entity.getTopicName(), result));
+            }
+        } else {
+            TopicCtrlEntity curEntity;
+            Set<String> changedTopicSet = new HashSet<>();
+            for (TopicCtrlEntity entity : entityMap.values()) {
+                curEntity = metaConfigMapper.getTopicCtrlByTopicName(entity.getTopicName());
+                if (curEntity != null) {
+                    if (curEntity.getMaxMsgSizeInB() != entity.getMaxMsgSizeInB()) {
+                        changedTopicSet.add(entity.getTopicName());
+                    }
+                }
+                metaConfigMapper.addOrUpdTopicCtrlConf(isAddOp, entity, strBuff, result);
+                retInfo.add(new TopicProcessResult(0, entity.getTopicName(), result));
+            }
+            if (!changedTopicSet.isEmpty()) {
+                triggerBrokerConfDataSync(changedTopicSet, strBuff, result);
+            }
+        }
+        return retInfo;
     }
 
     @Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index 95f94270b..6370cb98f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -404,7 +404,7 @@ public interface MetaDataService extends Server {
      *
      * @param isAddOp        whether add operation
      * @param opEntity       operator information
-     * @param topicName      topic name
+     * @param topicNameSet   topic name set
      * @param topicNameId    the topic name id
      * @param enableTopicAuth  whether enable topic authentication
      * @param maxMsgSizeInMB   the max message size in MB
@@ -412,22 +412,23 @@ public interface MetaDataService extends Server {
      * @param result           the process result return
      * @return true if success otherwise false
      */
-    TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
-                                             String topicName, int topicNameId,
-                                             Boolean enableTopicAuth, int maxMsgSizeInMB,
-                                             StringBuilder strBuff, ProcessResult result);
+    List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+                                                   Set<String> topicNameSet, int topicNameId,
+                                                   Boolean enableTopicAuth, int maxMsgSizeInMB,
+                                                   StringBuilder strBuff, ProcessResult result);
 
     /**
      * Add or Update topic control configure info
      *
-     * @param isAddOp  whether add operation
-     * @param entity   the topic control info entity will be add
-     * @param strBuff  the print info string buffer
-     * @param result   the process result return
+     * @param isAddOp    whether add operation
+     * @param entityMap  the topic control entity map which will be add
+     * @param strBuff    the print info string buffer
+     * @param result     the process result return
      * @return true if success otherwise false
      */
-    TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
-                                             StringBuilder strBuff, ProcessResult result);
+    List<TopicProcessResult> addOrUpdTopicCtrlConf(boolean isAddOp,
+                                                   Map<String, TopicCtrlEntity> entityMap,
+                                                   StringBuilder strBuff, ProcessResult result);
 
     /**
      * Insert topic control configure info
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index 8acfb75d9..ccfc05fc4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -356,6 +356,14 @@ public interface MetaConfigMapper extends KeepAliveService {
      */
     Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);
 
+    /**
+     * Get deployed broker id for the special topic name set
+     *
+     * @param topicNameSet   the topic name set need to query
+     * @return  the broker id set
+     */
+    Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet);
+
     /**
      * Get deployed topic set
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
index 26d782b9f..65368f831 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicDeployMapper.java
@@ -118,6 +118,8 @@ public interface TopicDeployMapper extends AbstractMapper {
 
     Map<String/* topicName */, Map<Integer, String>> getTopicBrokerInfo(Set<String> topicNameSet);
 
+    Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet);
+
     Set<String> getDeployedTopicSet();
 
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 9e1421508..965706770 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -868,6 +868,11 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
         return topicDeployMapper.getTopicBrokerInfo(topicNameSet);
     }
 
+    @Override
+    public Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet) {
+        return topicDeployMapper.getDeployedBrokerIdByTopic(topicNameSet);
+    }
+
     @Override
     public Set<String> getDeployedTopicSet() {
         return topicDeployMapper.getDeployedTopicSet();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index 5ed2e5102..f9c627cb9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -446,6 +446,30 @@ public abstract class AbsTopicDeployMapperImpl implements TopicDeployMapper {
         return retEntityMap;
     }
 
+    @Override
+    public Set<Integer> getDeployedBrokerIdByTopic(Set<String> topicNameSet) {
+        ConcurrentHashSet<String> keySet;
+        Set<Integer> retSet = new HashSet<>();
+        if (topicNameSet == null || topicNameSet.isEmpty()) {
+            return retSet;
+        }
+        for (String topicName : topicNameSet) {
+            if (topicName == null) {
+                continue;
+            }
+            keySet = topicName2RecordCache.get(topicName);
+            if (keySet != null) {
+                for (String key : keySet) {
+                    TopicDeployEntity entry = topicDeployCache.get(key);
+                    if (entry != null) {
+                        retSet.add(entry.getBrokerId());
+                    }
+                }
+            }
+        }
+        return retSet;
+    }
+
     @Override
     public Set<String> getDeployedTopicSet() {
         return new HashSet<>(topicName2RecordCache.keySet());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index 7dd1a6ead..f1d9f7f90 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -206,7 +206,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
-        BaseEntity opEntity = (BaseEntity) result.getRetData();
+        final BaseEntity opEntity = (BaseEntity) result.getRetData();
         // check and get topicName field
         if (!WebParameterUtils.getStringParamValue(req,
                 WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
@@ -242,17 +242,15 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
                 WebFieldDef.MAXMSGSIZEINMB, false,
                 (isAddOp ? maxMsgSizeMB : TBaseConstants.META_VALUE_UNDEFINED),
                 TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
-                maxMsgSizeMB, sBuffer, result)) {
+                TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, sBuffer, result)) {
             WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
             return sBuffer;
         }
         maxMsgSizeMB = (int) result.getRetData();
         // add or update records
-        List<TopicProcessResult> retInfo = new ArrayList<>();
-        for (String topicName : topicNameSet) {
-            retInfo.add(defMetaDataService.addOrUpdTopicCtrlConf(isAddOp, opEntity,
-                    topicName, topicNameId, enableTopicAuth, maxMsgSizeMB, sBuffer, result));
-        }
+        List<TopicProcessResult> retInfo =
+                defMetaDataService.addOrUpdTopicCtrlConf(isAddOp, opEntity,
+                        topicNameSet, topicNameId, enableTopicAuth, maxMsgSizeMB, sBuffer, result);
         return buildRetInfo(retInfo, sBuffer);
     }
 
@@ -273,11 +271,8 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
         }
         Map<String, TopicCtrlEntity> addRecordMap =
                 (Map<String, TopicCtrlEntity>) result.getRetData();
-        List<TopicProcessResult> retInfo = new ArrayList<>();
-        for (TopicCtrlEntity topicCtrlInfo : addRecordMap.values()) {
-            retInfo.add(defMetaDataService.addOrUpdTopicCtrlConf(
-                    isAddOp, topicCtrlInfo, sBuffer, result));
-        }
+        List<TopicProcessResult> retInfo =
+                defMetaDataService.addOrUpdTopicCtrlConf(isAddOp, addRecordMap, sBuffer, result);
         return buildRetInfo(retInfo, sBuffer);
     }
 
@@ -315,7 +310,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
                     WebFieldDef.MAXMSGSIZEINMB, false,
                     (isAddOp ? defMaxMsgSizeMB : TBaseConstants.META_VALUE_UNDEFINED),
                     TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
-                    defMaxMsgSizeMB, sBuffer, result)) {
+                    TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, sBuffer, result)) {
                 return result.isSuccess();
             }
             final int itemMaxMsgSizeMB = (int) result.getRetData();
@@ -370,5 +365,4 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
         WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
         return sBuffer;
     }
-
 }