You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/10 09:43:06 UTC
[inlong] branch master updated: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fd454ad4e [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)
fd454ad4e is described below
commit fd454ad4e3f2cd316e68bf4d1655ff99f1c08879
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Oct 10 17:43:01 2022 +0800
[INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)
---
.../server/common/utils/WebParameterUtils.java | 23 ++++++++
.../master/metamanage/DefaultMetaDataService.java | 15 +++--
.../server/master/metamanage/MetaDataService.java | 9 +++
.../dao/entity/GroupConsumeCtrlEntity.java | 15 +++++
.../metastore/dao/entity/GroupResCtrlEntity.java | 22 ++++++++
.../metastore/dao/entity/TopicCtrlEntity.java | 64 ++++++++++++++++------
.../metastore/dao/mapper/MetaConfigMapper.java | 10 ++++
.../metastore/dao/mapper/TopicCtrlMapper.java | 10 ++++
.../metastore/impl/AbsMetaConfigMapperImpl.java | 58 +++++++-------------
.../metastore/impl/AbsTopicCtrlMapperImpl.java | 27 +++++++++
.../impl/bdbimpl/BdbMetaConfigMapperImpl.java | 2 +-
.../master/web/handler/WebOtherInfoHandler.java | 15 +++++
.../master/web/handler/WebTopicDeployHandler.java | 9 +--
13 files changed, 213 insertions(+), 66 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index dd3e3c948..005489780 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.tubemq.server.common.utils;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
@@ -725,6 +726,12 @@ public class WebParameterUtils {
if (paramValue == null) {
paramValue = req.getParameter(fieldDef.shortName);
}
+ } else if (paramCntr instanceof JsonObject) {
+ JsonObject jsonObject = (JsonObject) paramCntr;
+ paramValue = jsonObject.get(fieldDef.name).getAsString();
+ if (paramValue == null) {
+ paramValue = jsonObject.get(fieldDef.shortName).getAsString();
+ }
} else {
throw new IllegalArgumentException("Unknown parameter type!");
}
@@ -1562,6 +1569,22 @@ public class WebParameterUtils {
return strManageStatus;
}
+ public static int getBrokerManageStatusId(String strManageStatus) {
+ int manageStatus = TStatusConstants.STATUS_MANAGE_NOT_DEFINED;
+ if (strManageStatus.equals("draft")) {
+ manageStatus = TStatusConstants.STATUS_MANAGE_APPLY;
+ } else if (strManageStatus.equals("online")) {
+ manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE;
+ } else if (strManageStatus.equals("offline")) {
+ manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
+ } else if (strManageStatus.equals("only-read")) {
+ manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE;
+ } else if (strManageStatus.equals("only-write")) {
+ manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ;
+ }
+ return manageStatus;
+ }
+
/**
* translate broker manage status from int to tuple2 value
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index d26d32810..c26ac6118 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -834,10 +834,9 @@ public class DefaultMetaDataService implements MetaDataService {
int maxMsgSize = defMsgSizeInB;
TopicCtrlEntity topicCtrlEntity =
metaConfigMapper.getTopicCtrlByTopicName(topicEntity.getTopicName());
- if (topicCtrlEntity != null) {
- if (topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) {
- maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB();
- }
+ if (topicCtrlEntity != null
+ && topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) {
+ maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB();
}
if (maxMsgSize == defMsgSizeInB) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
@@ -948,7 +947,8 @@ public class DefaultMetaDataService implements MetaDataService {
ClusterSettingEntity clusterSettingEntity = metaConfigMapper.getClusterDefSetting(false);
int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
TopicCtrlEntity topicCtrlEntity = metaConfigMapper.getTopicCtrlByTopicName(topicName);
- if (topicCtrlEntity != null) {
+ if (topicCtrlEntity != null
+ && topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) {
maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
}
return maxMsgSizeInMB;
@@ -960,6 +960,11 @@ public class DefaultMetaDataService implements MetaDataService {
return metaConfigMapper.getTopicCtrlConf(topicNameSet, qryEntity);
}
+ @Override
+ public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+ Set<String> topicNameSet) {
+ return metaConfigMapper.getMaxMsgSizeInBByTopics(defMaxMsgSizeInB, topicNameSet);
+ }
// //////////////////////////////////////////////////////////////////////////////
@Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
index 6370cb98f..8f26f2507 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java
@@ -495,6 +495,15 @@ public interface MetaDataService extends Server {
Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
TopicCtrlEntity qryEntity);
+ /**
+ * get topic max message size configure info from store
+ *
+ * @param defMaxMsgSizeInB the default max message size in B
+ * @param topicNameSet need matched topic name set
+ * @return result, only read
+ */
+ Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, Set<String> topicNameSet);
+
// //////////////////////////////////////////////////////////////////////////////
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index 983b276ac..5a73c57ec 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -216,6 +216,21 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
return changed;
}
+ /**
+ * fill empty fields with default value
+ *
+ * @return object
+ */
+ public GroupConsumeCtrlEntity fillEmptyValues() {
+ if (this.filterEnable == EnableStatus.STATUS_UNDEFINE) {
+ this.filterEnable = EnableStatus.STATUS_DISABLE;
+ }
+ if (this.consumeEnable == EnableStatus.STATUS_UNDEFINE) {
+ this.consumeEnable = EnableStatus.STATUS_ENABLE;
+ }
+ return this;
+ }
+
/**
* Check whether the specified query item value matches
* Allowed query items:
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index 61c75aed3..ac2bac075 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -107,6 +107,28 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
return this;
}
+ /**
+ * fill empty fields with default value
+ *
+ * @return object
+ */
+ public GroupResCtrlEntity fillEmptyValues() {
+ if (TStringUtils.isBlank(this.flowCtrlInfo)) {
+ setFlowCtrlRule(0, TServerConstants.BLANK_FLOWCTRL_RULES);
+ }
+ if (this.resCheckStatus == EnableStatus.STATUS_UNDEFINE) {
+ this.resCheckStatus = EnableStatus.STATUS_DISABLE;
+ this.allowedBrokerClientRate = 0;
+ }
+ if (this.flowCtrlStatus == EnableStatus.STATUS_UNDEFINE) {
+ this.flowCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ if (this.qryPriorityId == TBaseConstants.META_VALUE_UNDEFINED) {
+ this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
+ }
+ return this;
+ }
+
public GroupResCtrlEntity setGroupName(String groupName) {
this.groupName = groupName;
return this;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 00702950a..2ad57a15b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -54,9 +54,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
this.topicName = topicName;
this.topicNameId = topicNameId;
this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
- this.maxMsgSizeInB =
- SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
- this.maxMsgSizeInMB = maxMsgSizeInMB;
+ fillMaxMsgSizeInMB(maxMsgSizeInMB);
}
/**
@@ -165,14 +163,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
}
// check and set modified field
if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) {
- int tmpMaxMsgSizeInMB =
- SettingValidUtils.validAndGetMsgSizeInMB(newMaxMsgSizeMB);
- if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) {
- changed = true;
- this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
- this.maxMsgSizeInB =
- SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB);
- }
+ changed = fillMaxMsgSizeInMB(newMaxMsgSizeMB);
}
// check and set authCtrlStatus info
if (enableTopicAuth != null
@@ -187,6 +178,27 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
return changed;
}
+ /**
+ * fill empty fields with default value
+ *
+ * @param defSetting current system default setting
+ *
+ * @return object
+ */
+ public TopicCtrlEntity fillEmptyValues(ClusterSettingEntity defSetting) {
+ if (this.maxMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) {
+ int tmpMaxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ if (defSetting != null) {
+ tmpMaxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
+ }
+ fillMaxMsgSizeInMB(tmpMaxMsgSizeInMB);
+ }
+ if (this.authCtrlStatus == EnableStatus.STATUS_UNDEFINE) {
+ this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ return this;
+ }
+
/**
* Check whether the specified query item value matches
* Allowed query items:
@@ -242,14 +254,30 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
return sBuilder;
}
- private void fillMaxMsgSizeInB(int maxMsgSizeInB) {
- int tmpMaxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
- if (maxMsgSizeInB > TBaseConstants.META_MB_UNIT_SIZE) {
- tmpMaxMsgSizeInMB = SettingValidUtils.validAndGetMsgSizeBtoMB(maxMsgSizeInB);
+ private boolean fillMaxMsgSizeInB(int maxMsgSizeInB) {
+ boolean changed = false;
+ int tmpMaxMsgSizeInMB =
+ SettingValidUtils.validAndGetMsgSizeBtoMB(maxMsgSizeInB);
+ if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) {
+ changed = true;
+ this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
+ this.maxMsgSizeInB =
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB);
+ }
+ return changed;
+ }
+
+ private boolean fillMaxMsgSizeInMB(int maxMsgSizeInMB) {
+ boolean changed = false;
+ int tmpMaxMsgSizeInMB =
+ SettingValidUtils.validAndGetMsgSizeInMB(maxMsgSizeInMB);
+ if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) {
+ changed = true;
+ this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
+ this.maxMsgSizeInB =
+ SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB);
}
- this.maxMsgSizeInMB = tmpMaxMsgSizeInMB;
- this.maxMsgSizeInB =
- SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(this.maxMsgSizeInMB);
+ return changed;
}
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
index ccfc05fc4..f3cd23e7c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java
@@ -238,6 +238,16 @@ public interface MetaConfigMapper extends KeepAliveService {
Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
TopicCtrlEntity qryEntity);
+ /**
+ * get topic max message size configure info from store
+ *
+ * @param defMaxMsgSizeInB the default max message size in B
+ * @param topicNameSet need matched topic name set
+ * @return result, only read
+ */
+ Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+ Set<String> topicNameSet);
+
// ////////////////////////////////////////////////////////////
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
index 1df1397dc..bb0882b2a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java
@@ -82,4 +82,14 @@ public interface TopicCtrlMapper extends AbstractMapper {
*/
Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet,
TopicCtrlEntity qryEntity);
+
+ /**
+ * get topic max message size configure info from store
+ *
+ * @param defMaxMsgSizeInB the default max message size in B
+ * @param topicNameSet need matched topic name set
+ * @return result, only read
+ */
+ Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+ Set<String> topicNameSet);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 965706770..f54ceebdf 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -451,10 +451,9 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
ClusterSettingEntity clusterSettingEntity = getClusterDefSetting(false);
int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
TopicCtrlEntity topicCtrlEntity = topicCtrlMapper.getTopicCtrlConf(topicName);
- if (topicCtrlEntity != null) {
- if (topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) {
- maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
- }
+ if (topicCtrlEntity != null
+ && topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) {
+ maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
}
return maxMsgSizeInMB;
}
@@ -470,6 +469,12 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
return topicCtrlMapper.getTopicCtrlConf(topicNameSet, qryEntity);
}
+ @Override
+ public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+ Set<String> topicNameSet) {
+ return topicCtrlMapper.getMaxMsgSizeInBByTopics(defMaxMsgSizeInB, topicNameSet);
+ }
+
/**
* Add if absent topic control configure info
*
@@ -481,13 +486,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
*/
private boolean addTopicCtrlConfIfAbsent(BaseEntity opEntity, String topicName,
StringBuilder strBuff, ProcessResult result) {
- int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
- ClusterSettingEntity defSetting = getClusterDefSetting(false);
- if (defSetting != null) {
- maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
- }
TopicCtrlEntity entity = new TopicCtrlEntity(opEntity, topicName,
- TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB);
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
return innAddOrUpdTopicCtrlConf(false, true, entity, strBuff, result);
}
@@ -524,26 +524,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
strBuff.delete(0, strBuff.length());
return result.isSuccess();
}
- if (entity.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED
- || entity.getMaxMsgSizeInMB() == TBaseConstants.META_VALUE_UNDEFINED) {
- int topicId = entity.getTopicId();
- int maxMsgSizeInMB = entity.getMaxMsgSizeInMB();
- if (topicId == TBaseConstants.META_VALUE_UNDEFINED) {
- topicId = TServerConstants.TOPIC_ID_DEF;
- }
- if (maxMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) {
- maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
- ClusterSettingEntity defSetting = getClusterDefSetting(false);
- if (defSetting != null) {
- maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
- }
- }
- newEntity = new TopicCtrlEntity(entity, entity.getTopicName(), topicId, maxMsgSizeInMB);
- topicCtrlMapper.addTopicCtrlConf(newEntity, strBuff, result);
-
- } else {
- topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result);
- }
+ entity.fillEmptyValues(getClusterDefSetting(false));
+ topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result);
} else {
if (isAddOpOrOnlyAdd) {
if (chkConsistent) {
@@ -609,7 +591,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
Integer topicLockId = null;
Integer brokerLockId = null;
// add topic control configure
- addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+ BaseEntity opEntity = new BaseEntity("systemSelf", new Date());
+ addTopicCtrlConfIfAbsent(opEntity, entity.getTopicName(), strBuff, result);
// execute add or update operation
try {
// lock topicName meta-lock
@@ -653,11 +636,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
newProps.updModifyInfo(brokerEntity.getTopicProps());
newEntity = new TopicDeployEntity(entity,
entity.getBrokerId(), entity.getTopicName(), newProps);
- int topicId = entity.getTopicId();
- if (entity.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED) {
- topicId = TServerConstants.TOPIC_ID_DEF;
- }
- newEntity.updModifyInfo(entity.getDataVerId(), topicId,
+ newEntity.updModifyInfo(entity.getDataVerId(), entity.getTopicId(),
brokerEntity.getBrokerPort(), brokerEntity.getBrokerIp(),
entity.getTopicStatus(), entity.getTopicProps());
topicDeployMapper.addTopicDeployConf(newEntity, strBuff, result);
@@ -973,6 +952,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
strBuff.delete(0, strBuff.length());
return result.isSuccess();
}
+ entity.fillEmptyValues();
groupResCtrlMapper.addGroupResCtrlConf(entity, strBuff, result);
} else {
if (isAddOpOrOnlyAdd) {
@@ -1116,9 +1096,10 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
GroupConsumeCtrlEntity newEntity;
String printPrefix = "[addConsumeCtrlConf], ";
// append topic control configure
- addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result);
+ BaseEntity opEntity = new BaseEntity("systemSelf", new Date());
+ addTopicCtrlConfIfAbsent(opEntity, entity.getTopicName(), strBuff, result);
// append group control configure
- addGroupCtrlConfIfAbsent(entity, entity.getGroupName(), strBuff, result);
+ addGroupCtrlConfIfAbsent(opEntity, entity.getGroupName(), strBuff, result);
// execute add or update operation
try {
// lock topicName meta-lock
@@ -1138,6 +1119,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper {
strBuff.delete(0, strBuff.length());
return result.isSuccess();
}
+ entity.fillEmptyValues();
consumeCtrlMapper.addGroupConsumeCtrlConf(entity, strBuff, result);
} else {
if (isAddOpOrOnlyAdd) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
index e16440d8e..3457ee27d 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
@@ -147,6 +148,32 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper {
return retEntityMap;
}
+ @Override
+ public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB,
+ Set<String> topicNameSet) {
+ Map<String, Integer> resultMap = new HashMap<>();
+ if (topicNameSet == null || topicNameSet.isEmpty()) {
+ return resultMap;
+ }
+ TopicCtrlEntity ctrlEntity;
+ for (String topic : topicNameSet) {
+ if (topic == null) {
+ continue;
+ }
+ ctrlEntity = topicCtrlCache.get(topic);
+ if (ctrlEntity == null) {
+ continue;
+ }
+ if (ctrlEntity.getMaxMsgSizeInB() == TBaseConstants.META_VALUE_UNDEFINED
+ || ctrlEntity.getMaxMsgSizeInB() == defMaxMsgSizeInB) {
+ resultMap.put(topic, null);
+ } else {
+ resultMap.put(topic, ctrlEntity.getMaxMsgSizeInB());
+ }
+ }
+ return resultMap;
+ }
+
/**
* Clear cached data
*/
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
index f1442d76c..938b2537c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java
@@ -279,7 +279,7 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl {
return clusterGroupVO;
}
// translate replication group info to ClusterGroupVO structure
- Tuple2<Boolean, List<ClusterNodeVO>> transResult =
+ Tuple2<Boolean, List<ClusterNodeVO>> transResult =
transReplicateNodes(replicationGroup);
clusterGroupVO.setNodeData(transResult.getF1());
clusterGroupVO.setPrimaryNodeActive(isPrimaryNodeActive());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 4418fa68e..9fcc779d7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
import org.apache.inlong.tubemq.server.common.TubeServerVersion;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
@@ -271,6 +272,17 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
.append(",\"reqSourceCount\":").append(reqSourceCount)
.append(",\"curSourceCount\":").append(curSourceCount)
.append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
+ } else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+ Tuple2<Long, List<String>> metaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+ sBuffer.append(",\"topicMetaId\":").append(metaInfoTuple.getF0())
+ .append(",\"metaDetails\":[");
+ for (String itemInfo : metaInfoTuple.getF1()) {
+ if (itemCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("\"").append(itemInfo).append("\"");
+ }
+ sBuffer.append("]");
}
sBuffer.append(",\"rebInfo\":{");
if (balanceStatus == -2) {
@@ -515,6 +527,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
}
strBuffer.append("]");
}
+ } else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+ strBuffer.append(",\"sourceCount\":").append(consumer.getSourceCount())
+ .append(",\"nodeId\":").append(consumer.getNodeId());
}
Map<String, Map<String, Partition>> topicSubMap =
currentSubInfoMap.get(consumer.getConsumerId());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index b92a53324..344cb0f71 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -379,15 +379,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
defMetaDataService.getBrokerTopicConfigInfo(brokerIds);
// build query result
int dataCount = 0;
+ int topicCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) {
if (dataCount++ > 0) {
sBuffer.append(",");
}
- sBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":[");
- int topicCnt = 0;
- Set<String> topicSet = entry.getValue();
- for (String topic : topicSet) {
+ topicCnt = 0;
+ sBuffer.append("{\"brokerId\":").append(entry.getKey())
+ .append(",\"topicName\":[");
+ for (String topic : entry.getValue()) {
if (topicCnt++ > 0) {
sBuffer.append(",");
}