You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/10 09:43:06 UTC

[inlong] branch master updated: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fd454ad4e [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)
fd454ad4e is described below

commit fd454ad4e3f2cd316e68bf4d1655ff99f1c08879
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Oct 10 17:43:01 2022 +0800

    [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)
---
 .../server/common/utils/WebParameterUtils.java     | 23 ++++++++
 .../master/metamanage/DefaultMetaDataService.java  | 15 +++--
 .../server/master/metamanage/MetaDataService.java  |  9 +++
 .../dao/entity/GroupConsumeCtrlEntity.java         | 15 +++++
 .../metastore/dao/entity/GroupResCtrlEntity.java   | 22 ++++++++
 .../metastore/dao/entity/TopicCtrlEntity.java      | 64 ++++++++++++++++------
 .../metastore/dao/mapper/MetaConfigMapper.java     | 10 ++++
 .../metastore/dao/mapper/TopicCtrlMapper.java      | 10 ++++
 .../metastore/impl/AbsMetaConfigMapperImpl.java    | 58 +++++++-------------
 .../metastore/impl/AbsTopicCtrlMapperImpl.java     | 27 +++++++++
 .../impl/bdbimpl/BdbMetaConfigMapperImpl.java      |  2 +-
 .../master/web/handler/WebOtherInfoHandler.java    | 15 +++++
 .../master/web/handler/WebTopicDeployHandler.java  |  9 +--
 13 files changed, 213 insertions(+), 66 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index dd3e3c948..005489780 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.tubemq.server.common.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
@@ -725,6 +726,12 @@ public class WebParameterUtils {
             if (paramValue == null) {
                 paramValue = req.getParameter(fieldDef.shortName);
             }
+        } else if (paramCntr instanceof JsonObject) {
+            JsonObject jsonObject = (JsonObject) paramCntr;
+            paramValue = jsonObject.get(fieldDef.name).getAsString();
+            if (paramValue == null) {
+                paramValue = jsonObject.get(fieldDef.shortName).getAsString();
+            }
         } else {
             throw new IllegalArgumentException("Unknown parameter type!");
         }
@@ -1562,6 +1569,22 @@ public class WebParameterUtils {
         return strManageStatus;
     }
 
+    public static int getBrokerManageStatusId(String strManageStatus) {
+        int manageStatus = TStatusConstants.STATUS_MANAGE_NOT_DEFINED;
+        if (strManageStatus.equals("draft")) {
+            manageStatus = TStatusConstants.STATUS_MANAGE_APPLY;
+        } else if (strManageStatus.equals("online")) {
+            manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE;
+        } else if (strManageStatus.equals("offline")) {
+            manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
+        } else if (strManageStatus.equals("only-read")) {
+            manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE;
+        } else if (strManageStatus.equals("only-write")) {
+            manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ;
+        }
+        return manageStatus;
+    }
+
     /**
      * translate broker manage status from int to tuple2 value
      *
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index d26d32810..c26ac6118 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -834,10 +834,9 @@ public class DefaultMetaDataService implements MetaDataService {
             int maxMsgSize = defMsgSizeInB;
             TopicCtrlEntity topicCtrlEntity =
                     metaConfigMapper.getTopicCtrlByTopicName(topicEntity.getTopicName());
-            if (topicCtrlEntity != null) {
-                if (topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) {
-                    maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB();
-                }
+            if (topicCtrlEntity != null
+                    && topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) {
+                maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB();
             }
             if (maxMsgSize == defMsgSizeInB) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
@@ -948,7 +947,8 @@ public class DefaultMetaDataService implements MetaDataService {
         ClusterSettingEntity clusterSettingEntity = metaConfigMapper.getClusterDefSetting(false);
         int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
         TopicCtrlEntity topicCtrlEntity = metaConfigMapper.getTopicCtrlByTopicName(topicName);
-        if (topicCtrlEntity != null) {
+        if (topicCtrlEntity != null
+                && topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) {
             maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
         }
         return maxMsgSizeInMB;
@@ -960,6 +960,11 @@ public class DefaultMetaDataService implements MetaDataService {
         return metaConfigMapper.getTopicCtrlConf(topicNameSet, qryEntity);
     }
 
+    @Override
+    public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+                                                         Set<String> topicNameSet) {
+        return metaConfigMapper.getMaxMsgSizeInBByTopics(defMaxMsgSizeInB, topicNameSet);
+    }
     // //////////////////////////////////////////////////////////////////////////////
 
     @Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index 6370cb98f..8f26f2507 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -495,6 +495,15 @@ public interface MetaDataService extends Server {
     Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
                                                   TopicCtrlEntity qryEntity);
 
+    /**
+     * get topic max message size configure info from store
+     *
+     * @param defMaxMsgSizeInB  the default max message size in B
+     * @param topicNameSet  need matched topic name set
+     * @return result, only read
+     */
+    Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, Set<String> topicNameSet);
+
     // //////////////////////////////////////////////////////////////////////////////
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index 983b276ac..5a73c57ec 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -216,6 +216,21 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
         return changed;
     }
 
+    /**
+     * fill empty fields with default value
+     *
+     * @return  object
+     */
+    public GroupConsumeCtrlEntity fillEmptyValues() {
+        if (this.filterEnable == EnableStatus.STATUS_UNDEFINE) {
+            this.filterEnable = EnableStatus.STATUS_DISABLE;
+        }
+        if (this.consumeEnable == EnableStatus.STATUS_UNDEFINE) {
+            this.consumeEnable = EnableStatus.STATUS_ENABLE;
+        }
+        return this;
+    }
+
     /**
      * Check whether the specified query item value matches
      * Allowed query items:
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index 61c75aed3..ac2bac075 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -107,6 +107,28 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
         return this;
     }
 
+    /**
+     * fill empty fields with default value
+     *
+     * @return object
+     */
+    public GroupResCtrlEntity fillEmptyValues() {
+        if (TStringUtils.isBlank(this.flowCtrlInfo)) {
+            setFlowCtrlRule(0, TServerConstants.BLANK_FLOWCTRL_RULES);
+        }
+        if (this.resCheckStatus == EnableStatus.STATUS_UNDEFINE) {
+            this.resCheckStatus = EnableStatus.STATUS_DISABLE;
+            this.allowedBrokerClientRate = 0;
+        }
+        if (this.flowCtrlStatus == EnableStatus.STATUS_UNDEFINE) {
+            this.flowCtrlStatus = EnableStatus.STATUS_DISABLE;
+        }
+        if (this.qryPriorityId == TBaseConstants.META_VALUE_UNDEFINED) {
+            this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
+        }
+        return this;
+    }
+
     public GroupResCtrlEntity setGroupName(String groupName) {
         this.groupName = groupName;
         return this;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 00702950a..2ad57a15b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -54,9 +54,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         this.topicName = topicName;
         this.topicNameId = topicNameId;
         this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
-        this.maxMsgSizeInB =
-                SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
-        this.maxMsgSizeInMB = maxMsgSizeInMB;
+        fillMaxMsgSizeInMB(maxMsgSizeInMB);
     }
 
     /**
@@ -165,14 +163,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         }
         // check and set modified field
         if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
-            int tmpMaxMsgSizeInMB =
-                    SettingValidUtils.validAndGetMsgSizeInMB(newMaxMsgSizeMB);
-            if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) {
-                changed = true;
-                this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
-                this.maxMsgSizeInB =
-                        SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB);
-            }
+            changed = fillMaxMsgSizeInMB(newMaxMsgSizeMB);
         }
         // check and set authCtrlStatus info
         if (enableTopicAuth != null
@@ -187,6 +178,27 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         return changed;
     }
 
+    /**
+     * fill empty fields with default value
+     *
+     * @param defSetting  current system default setting
+     *
+     * @return  object
+     */
+    public TopicCtrlEntity fillEmptyValues(ClusterSettingEntity defSetting) {
+        if (this.maxMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) {
+            int tmpMaxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+            if (defSetting != null) {
+                tmpMaxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+            }
+            fillMaxMsgSizeInMB(tmpMaxMsgSizeInMB);
+        }
+        if (this.authCtrlStatus == EnableStatus.STATUS_UNDEFINE) {
+            this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+        }
+        return this;
+    }
+
     /**
      * Check whether the specified query item value matches
      * Allowed query items:
@@ -242,14 +254,30 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
         return sBuilder;
     }
 
-    private void fillMaxMsgSizeInB(int maxMsgSizeInB) {
-        int tmpMaxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
-        if (maxMsgSizeInB > TBaseConstants.META_MB_UNIT_SIZE) {
-            tmpMaxMsgSizeInMB = SettingValidUtils.validAndGetMsgSizeBtoMB(maxMsgSizeInB);
+    private boolean fillMaxMsgSizeInB(int maxMsgSizeInB) {
+        boolean changed = false;
+        int tmpMaxMsgSizeInMB =
+                SettingValidUtils.validAndGetMsgSizeBtoMB(maxMsgSizeInB);
+        if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) {
+            changed = true;
+            this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
+            this.maxMsgSizeInB =
+                    SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB);
+        }
+        return changed;
+    }
+
+    private boolean fillMaxMsgSizeInMB(int maxMsgSizeInMB) {
+        boolean changed = false;
+        int tmpMaxMsgSizeInMB =
+                SettingValidUtils.validAndGetMsgSizeInMB(maxMsgSizeInMB);
+        if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) {
+            changed = true;
+            this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
+            this.maxMsgSizeInB =
+                    SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB);
         }
-        this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
-        this.maxMsgSizeInB =
-                SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(this.maxMsgSizeInMB);
+        return changed;
     }
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index ccfc05fc4..f3cd23e7c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -238,6 +238,16 @@ public interface MetaConfigMapper extends KeepAliveService {
     Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
                                                   TopicCtrlEntity qryEntity);
 
+    /**
+     * get topic max message size configure info from store
+     *
+     * @param defMaxMsgSizeInB  the default max message size in B
+     * @param topicNameSet  need matched topic name set
+     * @return result, only read
+     */
+    Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+                                                  Set<String> topicNameSet);
+
     // ////////////////////////////////////////////////////////////
 
     /**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
index 1df1397dc..bb0882b2a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
@@ -82,4 +82,14 @@ public interface TopicCtrlMapper extends AbstractMapper {
      */
     Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
                                                   TopicCtrlEntity qryEntity);
+
+    /**
+     * get topic max message size configure info from store
+     *
+     * @param defMaxMsgSizeInB  the default max message size in B
+     * @param topicNameSet  need matched topic name set
+     * @return result, only read
+     */
+    Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+                                                  Set<String> topicNameSet);
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 965706770..f54ceebdf 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -451,10 +451,9 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
         ClusterSettingEntity clusterSettingEntity = getClusterDefSetting(false);
         int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
         TopicCtrlEntity topicCtrlEntity = topicCtrlMapper.getTopicCtrlConf(topicName);
-        if (topicCtrlEntity != null) {
-            if (topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) {
-                maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
-            }
+        if (topicCtrlEntity != null
+                && topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) {
+            maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
         }
         return maxMsgSizeInMB;
     }
@@ -470,6 +469,12 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
         return topicCtrlMapper.getTopicCtrlConf(topicNameSet, qryEntity);
     }
 
+    @Override
+    public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+                                                         Set<String> topicNameSet) {
+        return topicCtrlMapper.getMaxMsgSizeInBByTopics(defMaxMsgSizeInB, topicNameSet);
+    }
+
     /**
      * Add if absent topic control configure info
      *
@@ -481,13 +486,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
      */
     private boolean addTopicCtrlConfIfAbsent(BaseEntity opEntity, String topicName,
                                              StringBuilder strBuff, ProcessResult result) {
-        int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
-        ClusterSettingEntity defSetting = getClusterDefSetting(false);
-        if (defSetting != null) {
-            maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
-        }
         TopicCtrlEntity entity = new TopicCtrlEntity(opEntity, topicName,
-                TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB);
+                TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
         return innAddOrUpdTopicCtrlConf(false, true, entity, strBuff, result);
     }
 
@@ -524,26 +524,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
                     strBuff.delete(0, strBuff.length());
                     return result.isSuccess();
                 }
-                if (entity.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED
-                        || entity.getMaxMsgSizeInMB() == TBaseConstants.META_VALUE_UNDEFINED) {
-                    int topicId = entity.getTopicId();
-                    int maxMsgSizeInMB = entity.getMaxMsgSizeInMB();
-                    if (topicId == TBaseConstants.META_VALUE_UNDEFINED) {
-                        topicId = TServerConstants.TOPIC_ID_DEF;
-                    }
-                    if (maxMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) {
-                        maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
-                        ClusterSettingEntity defSetting = getClusterDefSetting(false);
-                        if (defSetting != null) {
-                            maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
-                        }
-                    }
-                    newEntity = new TopicCtrlEntity(entity, entity.getTopicName(), topicId, maxMsgSizeInMB);
-                    topicCtrlMapper.addTopicCtrlConf(newEntity, strBuff, result);
-
-                } else {
-                    topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result);
-                }
+                entity.fillEmptyValues(getClusterDefSetting(false));
+                topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result);
             } else {
                 if (isAddOpOrOnlyAdd) {
                     if (chkConsistent) {
@@ -609,7 +591,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
         Integer topicLockId = null;
         Integer brokerLockId = null;
         // add topic control configure
-        addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+        BaseEntity opEntity = new BaseEntity("systemSelf", new Date());
+        addTopicCtrlConfIfAbsent(opEntity, entity.getTopicName(), strBuff, result);
         // execute add or update operation
         try {
             // lock topicName meta-lock
@@ -653,11 +636,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
                     newProps.updModifyInfo(brokerEntity.getTopicProps());
                     newEntity = new TopicDeployEntity(entity,
                             entity.getBrokerId(), entity.getTopicName(), newProps);
-                    int topicId = entity.getTopicId();
-                    if (entity.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED) {
-                        topicId = TServerConstants.TOPIC_ID_DEF;
-                    }
-                    newEntity.updModifyInfo(entity.getDataVerId(), topicId,
+                    newEntity.updModifyInfo(entity.getDataVerId(), entity.getTopicId(),
                             brokerEntity.getBrokerPort(), brokerEntity.getBrokerIp(),
                             entity.getTopicStatus(), entity.getTopicProps());
                     topicDeployMapper.addTopicDeployConf(newEntity, strBuff, result);
@@ -973,6 +952,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
                     strBuff.delete(0, strBuff.length());
                     return result.isSuccess();
                 }
+                entity.fillEmptyValues();
                 groupResCtrlMapper.addGroupResCtrlConf(entity, strBuff, result);
             } else {
                 if (isAddOpOrOnlyAdd) {
@@ -1116,9 +1096,10 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
         GroupConsumeCtrlEntity newEntity;
         String printPrefix = "[addConsumeCtrlConf], ";
         // append topic control configure
-        addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+        BaseEntity opEntity = new BaseEntity("systemSelf", new Date());
+        addTopicCtrlConfIfAbsent(opEntity, entity.getTopicName(), strBuff, result);
         // append group control configure
-        addGroupCtrlConfIfAbsent(entity, entity.getGroupName(), strBuff, result);
+        addGroupCtrlConfIfAbsent(opEntity, entity.getGroupName(), strBuff, result);
         // execute add or update operation
         try {
             // lock topicName meta-lock
@@ -1138,6 +1119,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
                         strBuff.delete(0, strBuff.length());
                         return result.isSuccess();
                     }
+                    entity.fillEmptyValues();
                     consumeCtrlMapper.addGroupConsumeCtrlConf(entity, strBuff, result);
                 } else {
                     if (isAddOpOrOnlyAdd) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
index e16440d8e..3457ee27d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
 import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
@@ -147,6 +148,32 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper {
         return retEntityMap;
     }
 
+    @Override
+    public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+                                                         Set<String> topicNameSet) {
+        Map<String, Integer> resultMap = new HashMap<>();
+        if (topicNameSet == null || topicNameSet.isEmpty()) {
+            return resultMap;
+        }
+        TopicCtrlEntity ctrlEntity;
+        for (String topic : topicNameSet) {
+            if (topic == null) {
+                continue;
+            }
+            ctrlEntity = topicCtrlCache.get(topic);
+            if (ctrlEntity == null) {
+                continue;
+            }
+            if (ctrlEntity.getMaxMsgSizeInB() == TBaseConstants.META_VALUE_UNDEFINED
+                    || ctrlEntity.getMaxMsgSizeInB() == defMaxMsgSizeInB) {
+                resultMap.put(topic, null);
+            } else {
+                resultMap.put(topic, ctrlEntity.getMaxMsgSizeInB());
+            }
+        }
+        return resultMap;
+    }
+
     /**
      * Clear cached data
      */
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
index f1442d76c..938b2537c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
@@ -279,7 +279,7 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
             return clusterGroupVO;
         }
         // translate replication group info to ClusterGroupVO structure
-        Tuple2<Boolean, List<ClusterNodeVO>>  transResult =
+        Tuple2<Boolean, List<ClusterNodeVO>> transResult =
                 transReplicateNodes(replicationGroup);
         clusterGroupVO.setNodeData(transResult.getF1());
         clusterGroupVO.setPrimaryNodeActive(isPrimaryNodeActive());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 4418fa68e..9fcc779d7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest;
 import org.apache.inlong.tubemq.corebase.cluster.Partition;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
 import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
 import org.apache.inlong.tubemq.server.common.TubeServerVersion;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
@@ -271,6 +272,17 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                         .append(",\"reqSourceCount\":").append(reqSourceCount)
                         .append(",\"curSourceCount\":").append(curSourceCount)
                         .append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
+            } else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+                Tuple2<Long, List<String>> metaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+                sBuffer.append(",\"topicMetaId\":").append(metaInfoTuple.getF0())
+                        .append(",\"metaDetails\":[");
+                for (String itemInfo : metaInfoTuple.getF1()) {
+                    if (itemCnt++ > 0) {
+                        sBuffer.append(",");
+                    }
+                    sBuffer.append("\"").append(itemInfo).append("\"");
+                }
+                sBuffer.append("]");
             }
             sBuffer.append(",\"rebInfo\":{");
             if (balanceStatus == -2) {
@@ -515,6 +527,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
                         }
                         strBuffer.append("]");
                     }
+                } else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+                    strBuffer.append(",\"sourceCount\":").append(consumer.getSourceCount())
+                            .append(",\"nodeId\":").append(consumer.getNodeId());
                 }
                 Map<String, Map<String, Partition>> topicSubMap =
                         currentSubInfoMap.get(consumer.getConsumerId());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index b92a53324..344cb0f71 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -379,15 +379,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
                 defMetaDataService.getBrokerTopicConfigInfo(brokerIds);
         // build query result
         int dataCount = 0;
+        int topicCnt = 0;
         WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
         for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
             if (dataCount++ > 0) {
                 sBuffer.append(",");
             }
-            sBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
-            int topicCnt = 0;
-            Set<String> topicSet = entry.getValue();
-            for (String topic : topicSet) {
+            topicCnt = 0;
+            sBuffer.append("{\"brokerId\":").append(entry.getKey())
+                    .append(",\"topicName\":[");
+            for (String topic : entry.getValue()) {
                 if (topicCnt++ > 0) {
                     sBuffer.append(",");
                 }