You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/04/21 12:13:31 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-598]Adjust
WebTopicCtrlHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 7db3f69 [INLONG-598]Adjust WebTopicCtrlHandler class implementation
7db3f69 is described below
commit 7db3f69ef5487fa159bcdfd4c01834e1dbaa6b1d
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Apr 20 22:51:18 2021 +0800
[INLONG-598]Adjust WebTopicCtrlHandler class implementation
---
.../tubemq/corebase/utils/KeyBuilderUtils.java | 5 +-
.../tubemq/server/common/fielddef/WebFieldDef.java | 6 +-
.../server/common/utils/WebParameterUtils.java | 70 ++-
.../server/master/metamanage/MetaDataManager.java | 621 +++++++++------------
.../metastore/BdbMetaStoreServiceImpl.java | 78 +--
.../metamanage/metastore/MetaStoreService.java | 22 +-
.../metastore/dao/entity/BrokerConfEntity.java | 6 +-
.../metastore/dao/entity/GroupBlackListEntity.java | 172 ------
.../dao/entity/GroupConsumeCtrlEntity.java | 4 +
.../metastore/dao/entity/GroupResCtrlEntity.java | 6 +-
.../metastore/dao/mapper/GroupBlackListMapper.java | 49 --
.../dao/mapper/GroupConsumeCtrlMapper.java | 2 +
.../impl/bdbimpl/BdbGroupBlackListMapperImpl.java | 371 ------------
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 7 +
.../master/web/handler/WebBrokerConfHandler.java | 471 +++++++---------
.../web/handler/WebGroupConsumeCtrlHandler.java | 621 ++++++---------------
.../master/web/handler/WebGroupResCtrlHandler.java | 326 ++++++-----
.../master/web/handler/WebTopicCtrlHandler.java | 245 +++-----
18 files changed, 1021 insertions(+), 2061 deletions(-)
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/KeyBuilderUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/KeyBuilderUtils.java
index 2e6db15..dd6e119 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/KeyBuilderUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/KeyBuilderUtils.java
@@ -38,10 +38,11 @@ public class KeyBuilderUtils {
return ret;
}
String[] items = recordKey.split(TokenConstants.ATTR_SEP);
+ // return [topicName, groupName]
if (items.length < 2) {
- ret.setF0AndF1(items[0], items[1]);
+ ret.setF0AndF1(items[0], "");
} else {
- ret.setF0AndF1(items[0], items[0]);
+ ret.setF0AndF1(items[0], items[1]);
}
return ret;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 1b088e1..269cd6c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -224,8 +224,10 @@ public enum WebFieldDef {
WebFieldType.BOOLEAN, "With topic deploy info."),
TOPICCTRLSET(80, "topicCtrlJsonSet", "tCtrlSet", WebFieldType.JSONSET,
- "The topic control info set that needs to be added or modified");
-
+ "The topic control info set that needs to be added or modified"),
+ GROUPRESCTRLSET(81, "groupResCtrlJsonSet", "gResCtrlSet",
+ WebFieldType.JSONSET,
+ "The group resource control info set that needs to be added or modified");
public final int id;
public final String name;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 657fd69..d149cd0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -248,6 +248,22 @@ public class WebParameterUtils {
return result.success;
}
int qryPriorityId = (int) result.retData1;
+ return checkQryPriorityIdValue(qryPriorityId, result);
+ }
+
+ public static boolean getQryPriorityIdParameter(Map<String, String> keyValueMap,
+ boolean required, int defValue,
+ int minValue, ProcessResult result) {
+ if (!getIntParamValue(keyValueMap, WebFieldDef.QRYPRIORITYID,
+ required, defValue, minValue, result)) {
+ return result.success;
+ }
+ int qryPriorityId = (int) result.retData1;
+ return checkQryPriorityIdValue(qryPriorityId, result);
+ }
+
+ private static boolean checkQryPriorityIdValue(int qryPriorityId,
+ ProcessResult result) {
if (qryPriorityId > 303 || qryPriorityId < 101) {
result.setFailResult(new StringBuilder(512)
.append("Illegal value in ").append(WebFieldDef.QRYPRIORITYID.name)
@@ -270,7 +286,8 @@ public class WebParameterUtils {
.append(" must in ").append(allowedPriorityVal).toString());
return false;
}
- return true;
+ result.setSuccResult(qryPriorityId);
+ return result.isSuccess();
}
/**
@@ -1120,7 +1137,16 @@ public class WebParameterUtils {
result.setSuccResult(defValue);
return result.success;
}
- result.setSuccResult(Boolean.parseBoolean(paramValue));
+ if (paramValue.equalsIgnoreCase("true")
+ || paramValue.equalsIgnoreCase("false")) {
+ result.setSuccResult(Boolean.parseBoolean(paramValue));
+ } else {
+ try {
+ result.setSuccResult(!(Long.parseLong(paramValue) == 0));
+ } catch (Throwable e) {
+ result.setSuccResult(defValue);
+ }
+ }
return result.success;
}
@@ -1148,7 +1174,16 @@ public class WebParameterUtils {
result.setSuccResult(defValue);
return result.success;
}
- result.setSuccResult(Boolean.parseBoolean(paramValue));
+ if (paramValue.equalsIgnoreCase("true")
+ || paramValue.equalsIgnoreCase("false")) {
+ result.setSuccResult(Boolean.parseBoolean(paramValue));
+ } else {
+ try {
+ result.setSuccResult(!(Long.parseLong(paramValue) == 0));
+ } catch (Throwable e) {
+ result.setSuccResult(defValue);
+ }
+ }
return result.success;
}
@@ -2551,8 +2586,6 @@ public class WebParameterUtils {
public static int getAndCheckFlowRules(HttpServletRequest req,
String defValue,
ProcessResult result) {
- int ruleCnt = 0;
- StringBuilder strBuffer = new StringBuilder(512);
// get parameter value
String paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.name);
if (paramValue == null) {
@@ -2560,8 +2593,33 @@ public class WebParameterUtils {
}
if (TStringUtils.isBlank(paramValue)) {
result.setSuccResult(defValue);
- return ruleCnt;
+ return 0;
}
+ paramValue = paramValue.trim();
+ return validFlowRuleValue(paramValue, result);
+ }
+
+ // translate rule info to json format string
+ public static int getAndCheckFlowRules(Map<String, String> keyValueMap,
+ String defValue,
+ ProcessResult result) {
+ // get parameter value
+ String paramValue = keyValueMap.get(WebFieldDef.FLOWCTRLSET.name);
+ if (paramValue == null) {
+ paramValue = keyValueMap.get(WebFieldDef.FLOWCTRLSET.shortName);
+ }
+ if (TStringUtils.isBlank(paramValue)) {
+ result.setSuccResult(defValue);
+ return 0;
+ }
+ paramValue = paramValue.trim();
+ return validFlowRuleValue(paramValue, result);
+ }
+
+ private static int validFlowRuleValue(String paramValue,
+ ProcessResult result) {
+ int ruleCnt = 0;
+ StringBuilder strBuffer = new StringBuilder(512);
strBuffer.append("[");
paramValue = paramValue.trim();
List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index 89425a0..cd5f46b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -51,14 +51,12 @@ import org.apache.tubemq.server.master.metamanage.metastore.MetaStoreService;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlackListEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TargetValidResult;
import org.apache.tubemq.server.master.web.handler.BrokerProcessResult;
import org.apache.tubemq.server.master.web.handler.GroupProcessResult;
import org.apache.tubemq.server.master.web.handler.TopicProcessResult;
@@ -242,27 +240,25 @@ public class MetaDataManager implements Server {
* Check if consume target is authorization or not
*
* @param consumerId
- * @param consumerGroupName
+ * @param groupName
* @param reqTopicSet
- * @param reqTopicConditions
- * @param sb
+ * @param reqTopicCondMap
+ * @param sBuffer
* @return
*/
- public TargetValidResult isConsumeTargetAuthorized(String consumerId,
- String consumerGroupName,
- Set<String> reqTopicSet,
- Map<String, TreeSet<String>> reqTopicConditions,
- final StringBuilder sb) {
- // #lizard forgives
+ public boolean isConsumeTargetAuthorized(String consumerId, String groupName,
+ Set<String> reqTopicSet,
+ Map<String, TreeSet<String>> reqTopicCondMap,
+ StringBuilder sBuffer, ProcessResult result) {
// check topic set
if ((reqTopicSet == null) || (reqTopicSet.isEmpty())) {
- return new TargetValidResult(false, TErrCodeConstants.BAD_REQUEST,
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request miss necessary subscribed topic data");
+ return result.isSuccess();
}
-
- if ((reqTopicConditions != null) && (!reqTopicConditions.isEmpty())) {
+ if ((reqTopicCondMap != null) && (!reqTopicCondMap.isEmpty())) {
// check if request topic set is all in the filter topic set
- Set<String> condTopics = reqTopicConditions.keySet();
+ Set<String> condTopics = reqTopicCondMap.keySet();
List<String> unSetTopic = new ArrayList<>();
for (String topic : condTopics) {
if (!reqTopicSet.contains(topic)) {
@@ -270,146 +266,123 @@ public class MetaDataManager implements Server {
}
}
if (!unSetTopic.isEmpty()) {
- TargetValidResult validResult =
- new TargetValidResult(false, TErrCodeConstants.BAD_REQUEST,
- sb.append("Filter's Topic not subscribed :")
- .append(unSetTopic).toString());
- sb.delete(0, sb.length());
- return validResult;
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Filter's Topic not subscribed :")
+ .append(unSetTopic).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
}
// check if consumer group is in the blacklist
- List<String> fbdTopicList = new ArrayList<>();
- Set<String> naTopicSet =
- metaStoreService.getGrpBlkLstNATopicByGroupName(consumerGroupName);
- if (naTopicSet != null) {
- for (String topicItem : reqTopicSet) {
- if (naTopicSet.contains(topicItem)) {
- fbdTopicList.add(topicItem);
- }
+ GroupResCtrlEntity resCtrlEntity =
+ metaStoreService.getGroupResCtrlConf(groupName);
+ if (resCtrlEntity != null && !resCtrlEntity.isEnableConsume()) {
+ if (!resCtrlEntity.isEnableConsume()) {
+ result.setFailResult(TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
+ sBuffer.append("[unAuthorized Group] ").append(consumerId)
+ .append("'s consumerGroup in blackList by administrator, reason is ")
+ .append(resCtrlEntity.getDisableReason()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
}
- if (!fbdTopicList.isEmpty()) {
- return new TargetValidResult(false, TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
- sb.append("[unAuthorized Group] ").append(consumerId)
- .append("'s consumerGroup in blackList by administrator, topics : ")
- .append(fbdTopicList).toString());
- }
- // check if topic enabled authorization
- ArrayList<String> enableAuthTopicList = new ArrayList<>();
- ArrayList<String> unAuthTopicList = new ArrayList<>();
+ // check if group enable consume
+ Set<String> disableCsmTopicSet = new HashSet<>();
+ Set<String> enableFltCsmTopicSet = new HashSet<>();
for (String topicItem : reqTopicSet) {
if (TStringUtils.isBlank(topicItem)) {
continue;
}
-
- TopicCtrlEntity topicEntity =
- metaStoreService.getTopicCtrlConf(topicItem);
+ TopicCtrlEntity topicEntity = metaStoreService.getTopicCtrlConf(topicItem);
if (topicEntity == null) {
continue;
}
if (topicEntity.isAuthCtrlEnable()) {
- enableAuthTopicList.add(topicItem);
- //check if consumer group is allowed to consume
+ //check if consume group is allowed to consume
GroupConsumeCtrlEntity ctrlEntity =
- metaStoreService.getConsumeCtrlByGroupAndTopic(
- consumerGroupName, topicItem);
+ metaStoreService.getConsumeCtrlByGroupAndTopic(groupName, topicItem);
if (ctrlEntity == null) {
- unAuthTopicList.add(topicItem);
+ disableCsmTopicSet.add(topicItem);
+ }
+ //check if consume group is required filter consume
+ if (ctrlEntity.isEnableFilterConsume()) {
+ enableFltCsmTopicSet.add(topicItem);
}
}
}
- if (!unAuthTopicList.isEmpty()) {
- return new TargetValidResult(false, TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
- sb.append("[unAuthorized Group] ").append(consumerId)
+ if (!disableCsmTopicSet.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
+ sBuffer.append("[unAuthorized Group] ").append(consumerId)
.append("'s consumerGroup not authorized by administrator, unAuthorizedTopics : ")
- .append(unAuthTopicList).toString());
- }
- if (enableAuthTopicList.isEmpty()) {
- return new TargetValidResult(true, 200, "Ok!");
- }
- boolean isAllowed =
- checkRestrictedTopics(consumerGroupName,
- consumerId, enableAuthTopicList, reqTopicConditions, sb);
- if (isAllowed) {
- return new TargetValidResult(true, 200, "Ok!");
- } else {
- return new TargetValidResult(false,
- TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN, sb.toString());
+ .append(disableCsmTopicSet).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
+ // check if group enable filter consume
+ return checkFilterRstrTopics(groupName, consumerId,
+ enableFltCsmTopicSet, reqTopicCondMap, sBuffer, result);
}
- private boolean checkRestrictedTopics(final String groupName, final String consumerId,
- List<String> enableAuthTopicList,
- Map<String, TreeSet<String>> reqTopicConditions,
- final StringBuilder sb) {
- List<String> restrictedTopics = new ArrayList<>();
- Map<String, GroupConsumeCtrlEntity> authorizedFilterCondMap = new HashMap<>();
- for (String topicName : enableAuthTopicList) {
- GroupConsumeCtrlEntity ctrlEntity =
- metaStoreService.getConsumeCtrlByGroupAndTopic(groupName, topicName);
- if (ctrlEntity != null && ctrlEntity.isEnableFilterConsume()) {
- restrictedTopics.add(topicName);
- authorizedFilterCondMap.put(topicName, ctrlEntity);
- }
- }
- if (restrictedTopics.isEmpty()) {
- return true;
+ private boolean checkFilterRstrTopics(final String groupName, final String consumerId,
+ Set<String> enableFltCsmTopicSet,
+ Map<String, TreeSet<String>> reqTopicCondMap,
+ StringBuilder sBuffer, ProcessResult result) {
+ if (enableFltCsmTopicSet == null && enableFltCsmTopicSet.isEmpty()) {
+ result.setSuccResult("Ok!");
+ return result.isSuccess();
}
- boolean isAllowed = true;
- for (String tmpTopic : restrictedTopics) {
- GroupConsumeCtrlEntity ilterCondEntity =
- authorizedFilterCondMap.get(tmpTopic);
- if (ilterCondEntity == null) {
+ GroupConsumeCtrlEntity ctrlEntity;
+ for (String topicName : enableFltCsmTopicSet) {
+ ctrlEntity =
+ metaStoreService.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+ if (ctrlEntity == null || !ctrlEntity.isEnableFilterConsume()) {
continue;
}
- String allowedConds = ilterCondEntity.getFilterCondStr();
- TreeSet<String> condItemSet = reqTopicConditions.get(tmpTopic);
- if (allowedConds.length() == 2
- && allowedConds.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
- isAllowed = false;
- sb.append("[Restricted Group] ")
- .append(consumerId)
- .append(" : ").append(groupName)
- .append(" not allowed to consume any data of topic ")
- .append(tmpTopic);
- break;
- } else {
- if (condItemSet == null
- || condItemSet.isEmpty()) {
- isAllowed = false;
- sb.append("[Restricted Group] ")
- .append(consumerId)
- .append(" : ").append(groupName)
- .append(" must set the filter conditions of topic ")
- .append(tmpTopic);
- break;
- } else {
- Map<String, List<String>> unAuthorizedCondMap =
- new HashMap<>();
- for (String item : condItemSet) {
- if (!allowedConds.contains(sb.append(TokenConstants.ARRAY_SEP)
- .append(item).append(TokenConstants.ARRAY_SEP).toString())) {
- isAllowed = false;
- List<String> unAuthConds = unAuthorizedCondMap.get(tmpTopic);
- if (unAuthConds == null) {
- unAuthConds = new ArrayList<>();
- unAuthorizedCondMap.put(tmpTopic, unAuthConds);
- }
- unAuthConds.add(item);
- }
- sb.delete(0, sb.length());
- }
- if (!isAllowed) {
- sb.append("[Restricted Group] ").append(consumerId)
- .append(" : unAuthorized filter conditions ")
- .append(unAuthorizedCondMap);
- break;
+ String allowedCondStr = ctrlEntity.getFilterCondStr();
+ if (allowedCondStr.length() == 2
+ && allowedCondStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
+ result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
+ sBuffer.append("[Restricted Group] ").append(consumerId)
+ .append(" : ").append(groupName)
+ .append(" not allowed to consume any data of topic ")
+ .append(topicName).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ TreeSet<String> condItemSet = reqTopicCondMap.get(topicName);
+ if (condItemSet == null || condItemSet.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
+ sBuffer.append("[Restricted Group] ").append(consumerId)
+ .append(" : ").append(groupName)
+ .append(" must set the filter conditions of topic ")
+ .append(topicName).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ Map<String, List<String>> unAuthorizedCondMap = new HashMap<>();
+ for (String item : condItemSet) {
+ if (!allowedCondStr.contains(sBuffer.append(TokenConstants.ARRAY_SEP)
+ .append(item).append(TokenConstants.ARRAY_SEP).toString())) {
+ List<String> unAuthConds = unAuthorizedCondMap.get(topicName);
+ if (unAuthConds == null) {
+ unAuthConds = new ArrayList<>();
+ unAuthorizedCondMap.put(topicName, unAuthConds);
}
+ unAuthConds.add(item);
}
+ sBuffer.delete(0, sBuffer.length());
+ }
+ if (!unAuthorizedCondMap.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
+ sBuffer.append("[Restricted Group] ").append(consumerId)
+ .append(" : unAuthorized filter conditions ")
+ .append(unAuthorizedCondMap).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
}
- return isAllowed;
+ result.setSuccResult("Ok!");
+ return result.isSuccess();
}
@@ -422,30 +395,53 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public BrokerProcessResult addBrokerConfig(BaseEntity opInfoEntity, int brokerId,
- String brokerIp, int brokerPort,
- int brokerTlsPort, int brokerWebPort,
- int regionId, int groupId,
- ManageStatus manageStatus,
- TopicPropGroup topicProps,
- StringBuilder sBuilder,
- ProcessResult result) {
- BrokerConfEntity entity = new BrokerConfEntity(opInfoEntity);
- entity.setBrokerIdAndIp(brokerId, brokerIp);
- entity.updModifyInfo(opInfoEntity.getDataVerId(), brokerPort, brokerTlsPort,
- brokerWebPort, regionId, groupId, manageStatus, topicProps);
- return addBrokerConfig(entity, sBuilder, result);
- }
-
- public BrokerProcessResult addBrokerConfig(BrokerConfEntity entity,
- StringBuilder sBuilder,
- ProcessResult result) {
- if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) != null
- || metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- DataOpErrCode.DERR_EXISTED.getDescription());
+ public BrokerProcessResult addOrUpdBrokerConfig(boolean isAddOp, BaseEntity opInfoEntity,
+ int brokerId, String brokerIp, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
+ int regionId, int groupId,
+ ManageStatus mngStatus,
+ TopicPropGroup topicProps,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ BrokerConfEntity entity =
+ new BrokerConfEntity(opInfoEntity, brokerId, brokerIp);
+ entity.updModifyInfo(opInfoEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, regionId, groupId, mngStatus, topicProps);
+ return addOrUpdBrokerConfig(isAddOp, entity, sBuilder, result);
+ }
+
+ public BrokerProcessResult addOrUpdBrokerConfig(boolean isAddOp, BrokerConfEntity entity,
+ StringBuilder sBuffer, ProcessResult result) {
+ if (isAddOp) {
+ if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) == null &&
+ metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) == null) {
+ metaStoreService.addBrokerConf(entity, sBuffer, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ sBuffer.append("Duplicated broker configure record! query index is :")
+ .append("brokerId=").append(entity.getBrokerId())
+ .append(",brokerIp=").append(entity.getBrokerIp()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
} else {
- metaStoreService.addBrokerConf(entity, sBuilder, result);
+ BrokerConfEntity curEntity =
+ metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ } else {
+ BrokerConfEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (entity.updModifyInfo(entity.getDataVerId(), entity.getBrokerPort(),
+ entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
+ entity.getRegionId(), entity.getGroupId(),
+ entity.getManageStatus(), entity.getTopicProps())) {
+ metaStoreService.updBrokerConf(newEntity, sBuffer, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ DataOpErrCode.DERR_UNCHANGED.getDescription());
+ }
+ }
}
return new BrokerProcessResult(entity.getBrokerId(), entity.getBrokerIp(), result);
}
@@ -1364,66 +1360,54 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public List<TopicProcessResult> addOrUpdTopicCtrlConf(BaseEntity opInfoEntity,
- Set<String> topicNameSet,
- int topicNameId,
- Boolean enableTopicAuth,
- int maxMsgSizeInMB,
- StringBuilder sBuffer,
- ProcessResult result) {
- TopicCtrlEntity curEntity;
- TopicCtrlEntity newEntity;
- List<TopicProcessResult> retInfoList = new ArrayList<>();
- for (String topicName : topicNameSet) {
- curEntity = metaStoreService.getTopicCtrlConf(topicName);
- if (curEntity == null) {
- newEntity = new TopicCtrlEntity(opInfoEntity, topicName);
- newEntity.updModifyInfo(opInfoEntity.getDataVerId(),
- topicNameId, maxMsgSizeInMB, enableTopicAuth);
- metaStoreService.addTopicCtrlConf(newEntity, sBuffer, result);
- } else {
- newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opInfoEntity);
- if (!newEntity.updModifyInfo(opInfoEntity.getDataVerId(),
- topicNameId, maxMsgSizeInMB, enableTopicAuth)) {
- result.setSuccResult(null);
- retInfoList.add(new TopicProcessResult(0, topicName, result));
- continue;
- }
- metaStoreService.updTopicCtrlConf(newEntity, sBuffer, result);
- }
- retInfoList.add(new TopicProcessResult(0, topicName, result));
- }
- return retInfoList;
+ public TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ String topicName, int topicNameId,
+ Boolean enableTopicAuth, int maxMsgSizeInMB,
+ StringBuilder sBuffer, ProcessResult result) {
+ TopicCtrlEntity entity =
+ new TopicCtrlEntity(opEntity, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ topicNameId, maxMsgSizeInMB, enableTopicAuth);
+ return addOrUpdTopicCtrlConf(isAddOp, entity, sBuffer, result);
}
/**
* Add or Update topic control configure info
*
- * @param inEntity the topic control info entity will be add
+ * @param entity the topic control info entity will be add
* @param sBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- public TopicProcessResult addOrUpdTopicCtrlConf(TopicCtrlEntity inEntity,
- StringBuilder sBuffer,
- ProcessResult result) {
+ public TopicProcessResult addOrUpdTopicCtrlConf(boolean isAddOp, TopicCtrlEntity entity,
+ StringBuilder sBuffer, ProcessResult result) {
TopicCtrlEntity curEntity =
- metaStoreService.getTopicCtrlConf(inEntity.getTopicName());
- if (curEntity == null) {
- metaStoreService.addTopicCtrlConf(inEntity, sBuffer, result);
+ metaStoreService.getTopicCtrlConf(entity.getTopicName());
+ if (isAddOp) {
+ if (curEntity == null) {
+ metaStoreService.addTopicCtrlConf(entity, sBuffer, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ DataOpErrCode.DERR_EXISTED.getDescription());
+ }
} else {
- TopicCtrlEntity newEntity2 = curEntity.clone();
- newEntity2.updBaseModifyInfo(inEntity);
- if (!newEntity2.updModifyInfo(inEntity.getDataVerId(), inEntity.getTopicId(),
- inEntity.getMaxMsgSizeInMB(), inEntity.isAuthCtrlEnable())) {
- result.setSuccResult(null);
- return new TopicProcessResult(0, inEntity.getTopicName(), result);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ } else {
+ TopicCtrlEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.getTopicId(),
+ entity.getMaxMsgSizeInMB(), entity.isAuthCtrlEnable())) {
+ metaStoreService.updTopicCtrlConf(newEntity, sBuffer, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ DataOpErrCode.DERR_UNCHANGED.getDescription());
+ }
}
- metaStoreService.updTopicCtrlConf(newEntity2, sBuffer, result);
}
- return new TopicProcessResult(0, inEntity.getTopicName(), result);
+ return new TopicProcessResult(0, entity.getTopicName(), result);
}
/**
@@ -1609,18 +1593,19 @@ public class MetaDataManager implements Server {
// //////////////////////////////////////////////////////////////////////////////
- public GroupProcessResult addGroupResCtrlConf(BaseEntity opEntity, String groupName,
- Boolean consumeEnable, String disableRsn,
- Boolean resCheckEnable, int allowedBClientRate,
- int qryPriorityId, Boolean flowCtrlEnable,
- int flowRuleCnt, String flowCtrlInfo,
- StringBuilder sBuilder, ProcessResult result) {
+ public GroupProcessResult addOrUpdGroupResCtrlConf(boolean isAddOp, BaseEntity opEntity,
+ String groupName, Boolean consumeEnable,
+ String disableRsn, Boolean resCheckEnable,
+ int allowedBClientRate, int qryPriorityId,
+ Boolean flowCtrlEnable, int flowRuleCnt,
+ String flowCtrlInfo, StringBuilder sBuilder,
+ ProcessResult result) {
GroupResCtrlEntity entity =
new GroupResCtrlEntity(opEntity, groupName);
entity.updModifyInfo(opEntity.getDataVerId(),
consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
- return addGroupResCtrlConf(entity, sBuilder, result);
+ return addOrUpdGroupResCtrlConf(isAddOp, entity, sBuilder, result);
}
/**
@@ -1631,47 +1616,38 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public GroupProcessResult addGroupResCtrlConf(GroupResCtrlEntity entity,
- StringBuilder sBuilder,
- ProcessResult result) {
- if (metaStoreService.getGroupResCtrlConf(entity.getGroupName()) != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- DataOpErrCode.DERR_EXISTED.getDescription());
- } else {
- metaStoreService.addGroupResCtrlConf(entity, sBuilder, result);
- }
- return new GroupProcessResult(entity.getGroupName(), null, result);
- }
-
- /**
- * Update group resource control configure
- *
- * @return true if success otherwise false
- */
- public GroupProcessResult updGroupResCtrlConf(BaseEntity opEntity, String groupName,
- Boolean consumeEnable, String disableRsn,
- Boolean resCheckEnable, int allowedBClientRate,
- int qryPriorityId, Boolean flowCtrlEnable,
- int flowRuleCnt, String flowCtrlInfo,
- StringBuilder sBuilder, ProcessResult result) {
+ public GroupProcessResult addOrUpdGroupResCtrlConf(boolean isAddOp,
+ GroupResCtrlEntity entity,
+ StringBuilder sBuilder,
+ ProcessResult result) {
GroupResCtrlEntity curEntity =
- metaStoreService.getGroupResCtrlConf(groupName);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- return new GroupProcessResult(groupName, null, result);
- }
- GroupResCtrlEntity newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opEntity);
- if (newEntity.updModifyInfo(opEntity.getDataVerId(), consumeEnable, disableRsn,
- resCheckEnable, allowedBClientRate, qryPriorityId, flowCtrlEnable,
- flowRuleCnt, flowCtrlInfo)) {
- metaStoreService.updGroupResCtrlConf(newEntity, sBuilder, result);
+ metaStoreService.getGroupResCtrlConf(entity.getGroupName());
+ if (isAddOp) {
+ if (curEntity == null) {
+ metaStoreService.addGroupResCtrlConf(entity, sBuilder, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ DataOpErrCode.DERR_EXISTED.getDescription());
+ }
} else {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- DataOpErrCode.DERR_UNCHANGED.getDescription());
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ } else {
+ GroupResCtrlEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableConsume(),
+ entity.getDisableReason(), entity.isEnableResCheck(),
+ entity.getAllowedBrokerClientRate(), entity.getQryPriorityId(),
+ entity.isFlowCtrlEnable(), entity.getRuleCnt(), entity.getFlowCtrlInfo())) {
+ metaStoreService.updGroupResCtrlConf(newEntity, sBuilder, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ DataOpErrCode.DERR_UNCHANGED.getDescription());
+ }
+ }
}
- return new GroupProcessResult(groupName, null, result);
+ return new GroupProcessResult(entity.getGroupName(), null, result);
}
/**
@@ -1679,22 +1655,32 @@ public class MetaDataManager implements Server {
*
* @param operator operator
* @param groupNames the group will be deleted
- * @param strBuffer the print info string buffer
+ * @param sBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
public List<GroupProcessResult> delGroupResCtrlConf(String operator,
Set<String> groupNames,
- StringBuilder strBuffer,
+ StringBuilder sBuffer,
ProcessResult result) {
List<GroupProcessResult> retInfo = new ArrayList<>();
if (groupNames == null || groupNames.isEmpty()) {
return retInfo;
}
for (String groupName : groupNames) {
- metaStoreService.delGroupResCtrlConf(operator, groupName, strBuffer, result);
+ if (metaStoreService.hasGroupConsumeCtrlConf(groupName)) {
+ result.setFailResult(DataOpErrCode.DERR_CONDITION_LACK.getCode(),
+ sBuffer.append("Group ").append(groupName)
+ .append(" has consume control configures,")
+ .append(", please delete consume control configures first!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ retInfo.add(new GroupProcessResult(groupName, null, result));
+ continue;
+ }
+ metaStoreService.delGroupResCtrlConf(operator, groupName, sBuffer, result);
retInfo.add(new GroupProcessResult(groupName, null, result));
- strBuffer.delete(0, strBuffer.length());
+ sBuffer.delete(0, sBuffer.length());
result.clear();
}
return retInfo;
@@ -1709,30 +1695,52 @@ public class MetaDataManager implements Server {
return this.metaStoreService.getGroupResCtrlConf(groupName);
}
- public GroupProcessResult addGroupConsumeCtrlInfo(BaseEntity opInfoEntity, String groupName,
- String topicName, Boolean enableCsm,
- String disableRsn, Boolean enableFilter,
- String filterCondStr, StringBuilder sBuilder,
- ProcessResult result) {
+ public GroupProcessResult addOrUpdGroupConsumeCtrlInfo(boolean isAddOp, BaseEntity opEntity,
+ String groupName, String topicName,
+ Boolean enableCsm, String disableRsn,
+ Boolean enableFlt, String fltCondStr,
+ StringBuilder sBuilder,
+ ProcessResult result) {
GroupConsumeCtrlEntity entity =
- new GroupConsumeCtrlEntity(opInfoEntity, groupName, topicName);
- entity.updModifyInfo(opInfoEntity.getDataVerId(),
- enableCsm, disableRsn, enableFilter, filterCondStr);
- return addGroupConsumeCtrlInfo(entity, sBuilder, result);
+ new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ enableCsm, disableRsn, enableFlt, fltCondStr);
+ return addOrUpdGroupConsumeCtrlInfo(isAddOp, entity, sBuilder, result);
}
- public GroupProcessResult addGroupConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
- StringBuilder sBuilder,
- ProcessResult result) {
+ public GroupProcessResult addOrUpdGroupConsumeCtrlInfo(boolean isAddOp,
+ GroupConsumeCtrlEntity entity,
+ StringBuilder sBuilder,
+ ProcessResult result) {
+ // add group resource control record
if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuilder, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
- if (metaStoreService.getConsumeCtrlByGroupAndTopic(
- entity.getGroupName(), entity.getTopicName()) != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- DataOpErrCode.DERR_EXISTED.getDescription());
+ GroupConsumeCtrlEntity curEntity =
+ metaStoreService.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
+ if (isAddOp) {
+ if (curEntity == null) {
+ metaStoreService.addGroupConsumeCtrlConf(entity, sBuilder, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
+ DataOpErrCode.DERR_EXISTED.getDescription());
+ }
} else {
- metaStoreService.addGroupConsumeCtrlConf(entity, sBuilder, result);
+ if (curEntity == null) {
+ result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
+ DataOpErrCode.DERR_NOT_EXIST.getDescription());
+ } else {
+ GroupConsumeCtrlEntity newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (newEntity.updModifyInfo(entity.getDataVerId(),
+ entity.isEnableConsume(), entity.getDisableReason(),
+ entity.isEnableFilterConsume(), entity.getFilterCondStr())) {
+ metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuilder, result);
+ } else {
+ result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
+ DataOpErrCode.DERR_UNCHANGED.getDescription());
+ }
+ }
}
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
@@ -1790,21 +1798,18 @@ public class MetaDataManager implements Server {
return retInfo;
}
Set<String> rmvRecords = new HashSet<>();
- if (topicNameSet != null && !topicNameSet.isEmpty()) {
- rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByTopicName(topicNameSet));
- }
if (groupNameSet != null && !groupNameSet.isEmpty()) {
rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByGroupName(groupNameSet));
}
- GroupProcessResult retItem;
+ if (topicNameSet != null && !topicNameSet.isEmpty()) {
+ rmvRecords.addAll(metaStoreService.getConsumeCtrlKeyByTopicName(topicNameSet));
+ }
for (String recKey : rmvRecords) {
Tuple2<String, String> groupTopicTuple =
KeyBuilderUtils.splitRecKey2GroupTopic(recKey);
metaStoreService.delGroupConsumeCtrlConf(operator, recKey, strBuffer, result);
- retItem = new GroupProcessResult(groupTopicTuple.getF0(),
- groupTopicTuple.getF1(), result);
- retInfo.add(retItem);
- result.clear();
+ retInfo.add(new GroupProcessResult(groupTopicTuple.getF1(),
+ groupTopicTuple.getF0(), result));
}
return retInfo;
}
@@ -1818,7 +1823,6 @@ public class MetaDataManager implements Server {
return true;
}
resCtrlEntity = new GroupResCtrlEntity(opEntity, groupName);
- resCtrlEntity.setGroupName(groupName);
resCtrlEntity.fillDefaultValue();
return this.metaStoreService.addGroupResCtrlConf(resCtrlEntity, sBuilder, result);
}
@@ -1869,83 +1873,6 @@ public class MetaDataManager implements Server {
return metaStoreService.getConsumeCtrlByGroupAndTopic(groupNameSet, topicNameSet);
}
- /**
- * Add consume group to blacklist
- *
- * @param entity the group will be add into black list
- * @param strBuffer the print info string buffer
- * @param result the process result return
- * @return true if success otherwise false
- */
- public boolean confAddBlackListGroup(GroupBlackListEntity entity,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (metaStoreService.addGroupBlackListConf(entity, result)) {
- strBuffer.append("[confAddBlackListGroup], ")
- .append(entity.getCreateUser())
- .append(" added black list group record :")
- .append(entity.toString());
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confAddBlackListGroup], ")
- .append("failure to add black list group record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
- }
-
- /**
- * Delete black consume group list from store
- *
- * @param operator operator
- * @param groupName the blacklist record related to group
- * @param topicName the blacklist record related to topic
- * allow groupName or topicName is null,
- * but not all null
- * @return true if success
- */
- public boolean confDelBlackGroupConf(String operator,
- String groupName,
- String topicName,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (groupName == null && topicName == null) {
- result.setSuccResult(null);
- return true;
- }
- if (metaStoreService.delGroupBlackListConf(groupName, topicName, result)) {
- strBuffer.append("[confDelBlackGroupConf], ").append(operator)
- .append(" deleted black list group record by index : ")
- .append("groupName=").append(groupName)
- .append(", topicName=").append(topicName);
- logger.info(strBuffer.toString());
- } else {
- strBuffer.append("[confDelBlackGroupConf], ")
- .append("failure to delete black list group record : ")
- .append(result.getErrInfo());
- logger.warn(strBuffer.toString());
- }
- strBuffer.delete(0, strBuffer.length());
- return result.isSuccess();
- }
-
- /**
- * Get black consumer group list via query entity
- *
- * @param qryEntity the query entity
- * @return query result
- */
- public List<GroupBlackListEntity> confGetBlackGroupInfo(
- GroupBlackListEntity qryEntity) {
- return metaStoreService.getGroupBlackListConf(qryEntity);
- }
-
- public Set<String> getBlkGroupTopicInfo(String groupName) {
- return metaStoreService.getGrpBlkLstNATopicByGroupName(groupName);
- }
-
// //////////////////////////////////////////////////////////////////////////////
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 7551649..3018667 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -64,21 +64,18 @@ import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlackListEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.ClusterConfigMapper;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupBlackListMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupResCtrlMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicCtrlMapper;
import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbBrokerConfigMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbClusterConfigMapperImpl;
-import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupBlackListMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupConsumeCtrlMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbGroupResCtrlMapperImpl;
import org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl.BdbTopicCtrlMapperImpl;
@@ -142,16 +139,14 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
private ClusterConfigMapper clusterConfigMapper;
// broker configure
private BrokerConfigMapper brokerConfigMapper;
- // topic configure
+ // topic deployment configure
private TopicDeployMapper topicDeployMapper;
// topic control configure
private TopicCtrlMapper topicCtrlMapper;
- // group configure
+ // group resource control configure
private GroupResCtrlMapper groupResCtrlMapper;
- // group filter configure
+ // group consume control configure
private GroupConsumeCtrlMapper groupConsumeCtrlMapper;
- // group blackList configure
- private GroupBlackListMapper groupBlackListMapper;
@@ -208,7 +203,6 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
topicDeployMapper.close();
groupResCtrlMapper.close();
topicCtrlMapper.close();
- groupBlackListMapper.close();
groupConsumeCtrlMapper.close();
clusterConfigMapper.close();
/* evn close */
@@ -771,65 +765,6 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
return groupResCtrlMapper.getGroupResCtrlConf(groupSet, qryEntity);
}
- // group blacklist api
- @Override
- public boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result) {
- // check current status
- if (!checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- return groupBlackListMapper.addGroupBlackListConf(entity, result);
- }
-
- @Override
- public boolean updGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result) {
- // check current status
- if (!checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- return groupBlackListMapper.updGroupBlackListConf(entity, result);
- }
-
- @Override
- public boolean delGroupBlackListConf(String recordKey, ProcessResult result) {
- // check current status
- if (!checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- return groupBlackListMapper.delGroupBlackListConf(recordKey, result);
- }
-
- @Override
- public boolean delGroupBlackListConf(String groupName,
- String topicName,
- ProcessResult result) {
- // check current status
- if (!checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- return groupBlackListMapper.delGroupBlackListConf(groupName, topicName, result);
- }
-
- @Override
- public List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName) {
- return groupBlackListMapper.getGrpBlkLstConfByGroupName(groupName);
- }
-
- @Override
- public List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName) {
- return groupBlackListMapper.getGrpBlkLstConfByTopicName(topicName);
- }
-
- @Override
- public Set<String> getGrpBlkLstNATopicByGroupName(String groupName) {
- return groupBlackListMapper.getNotAllowedTopicByGroupName(groupName);
- }
-
- @Override
- public List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity) {
- return groupBlackListMapper.getGroupBlackListConf(qryEntity);
- }
-
@Override
public boolean addGroupConsumeCtrlConf(GroupConsumeCtrlEntity entity,
StringBuilder strBuffer,
@@ -946,6 +881,11 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public boolean hasGroupConsumeCtrlConf(String groupName) {
+ return groupConsumeCtrlMapper.hasGroupConsumeCtrlConf(groupName);
+ }
+
+ @Override
public GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey) {
return groupConsumeCtrlMapper.getGroupConsumeCtrlConfByRecKey(recordKey);
}
@@ -1409,7 +1349,6 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
groupResCtrlMapper = new BdbGroupResCtrlMapperImpl(repEnv, storeConfig);
topicCtrlMapper = new BdbTopicCtrlMapperImpl(repEnv, storeConfig);
groupConsumeCtrlMapper = new BdbGroupConsumeCtrlMapperImpl(repEnv, storeConfig);
- groupBlackListMapper = new BdbGroupBlackListMapperImpl(repEnv, storeConfig);
}
/* reload metadata */
@@ -1420,7 +1359,6 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
topicDeployMapper.loadConfig();
topicCtrlMapper.loadConfig();
groupResCtrlMapper.loadConfig();
- groupBlackListMapper.loadConfig();
groupConsumeCtrlMapper.loadConfig();
relaodRunData();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 7025e8c..6524471 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -26,7 +26,6 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metamanage.keepalive.KeepAlive;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlackListEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
@@ -265,25 +264,6 @@ public interface MetaStoreService extends KeepAlive, Server {
Map<String, GroupResCtrlEntity> getGroupResCtrlConf(Set<String> groupSet,
GroupResCtrlEntity qryEntity);
- // group blacklist api
- boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
-
- boolean updGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
-
- boolean delGroupBlackListConf(String groupName,
- String topicName,
- ProcessResult result);
-
- boolean delGroupBlackListConf(String recordKey, ProcessResult result);
-
- List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName);
-
- List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName);
-
- Set<String> getGrpBlkLstNATopicByGroupName(String groupName);
-
- List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity);
-
// group consume control api
/**
* Add group consume control configure
@@ -340,6 +320,8 @@ public interface MetaStoreService extends KeepAlive, Server {
boolean isTopicNameInUsed(String topicName);
+ boolean hasGroupConsumeCtrlConf(String groupName);
+
GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey);
GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String groupName, String topicName);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index f79f07e..243a638 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -60,8 +60,10 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable {
super();
}
- public BrokerConfEntity(BaseEntity opInfoEntity) {
- super(opInfoEntity);
+ public BrokerConfEntity(BaseEntity opEntity, int brokerId, String brokerIp) {
+ super(opEntity);
+ this.brokerId = brokerId;
+ this.brokerIp = brokerIp;
}
public BrokerConfEntity(int brokerId, String brokerIp, int brokerPort,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
deleted file mode 100644
index 1ca7c11..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
-
-import java.util.Date;
-import java.util.Objects;
-import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
-
-
-/*
- * store group black list setting
- *
- */
-public class GroupBlackListEntity extends BaseEntity implements Cloneable {
-
- private String recordKey = "";
- private String topicName = "";
- private String groupName = "";
- private String reason = "";
-
- public GroupBlackListEntity() {
- super();
- }
-
- public GroupBlackListEntity(String topicName, String groupName,
- String reason, String createUser, Date createDate) {
- super(createUser, createDate);
- this.setTopicAndGroup(topicName, groupName);
- this.reason = reason;
- }
-
- public GroupBlackListEntity(BdbBlackGroupEntity bdbEntity) {
- super(bdbEntity.getDataVerId(),
- bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
- this.setTopicAndGroup(bdbEntity.getTopicName(),
- bdbEntity.getBlackGroupName());
- this.reason = bdbEntity.getReason();
- this.setAttributes(bdbEntity.getAttributes());
- }
-
- public BdbBlackGroupEntity buildBdbBlackListEntity() {
- BdbBlackGroupEntity bdbEntity =
- new BdbBlackGroupEntity(topicName, groupName,
- getAttributes(), getCreateUser(), getCreateDate());
- bdbEntity.setDataVerId(getDataVerId());
- bdbEntity.setReason(reason);
- return bdbEntity;
- }
-
- public void setTopicAndGroup(String topicName, String groupName) {
- this.topicName = topicName;
- this.groupName = groupName;
- this.recordKey = KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName);
- }
-
- public String getRecordKey() {
- return recordKey;
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public String getGroupName() {
- return groupName;
- }
-
- public String getReason() {
- return reason;
- }
-
- /**
- * Check whether the specified query item value matches
- * Allowed query items:
- * groupName, topicName
- * @return true: matched, false: not match
- */
- public boolean isMatched(GroupBlackListEntity target) {
- if (target == null) {
- return true;
- }
- if (!super.isMatched(target)) {
- return false;
- }
- if ((TStringUtils.isNotBlank(target.getTopicName())
- && !target.getTopicName().equals(this.topicName))
- || (TStringUtils.isNotBlank(target.getGroupName())
- && !target.getGroupName().equals(this.groupName))) {
- return false;
- }
- return true;
- }
-
- /**
- * Serialize field to json format
- *
- * @param sBuilder build container
- * @param isLongName if return field key is long name
- * @return
- */
- @Override
- public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
- if (isLongName) {
- sBuilder.append("{\"topicName\":\"").append(topicName).append("\"")
- .append(",\"groupName\":\"").append(groupName).append("\"")
- .append(",\"reason\":\"").append(reason).append("\"");
- } else {
- sBuilder.append("{\"topic\":\"").append(topicName).append("\"")
- .append(",\"group\":\"").append(groupName).append("\"")
- .append(",\"rsn\":\"").append(reason).append("\"");
- }
- super.toWebJsonStr(sBuilder, isLongName);
- sBuilder.append("}");
- return sBuilder;
- }
-
- /**
- * check if subclass fields is equals
- *
- * @param other check object
- * @return if equals
- */
- public boolean isDataEquals(GroupBlackListEntity other) {
- return Objects.equals(recordKey, other.recordKey)
- && Objects.equals(topicName, other.topicName)
- && Objects.equals(groupName, other.groupName)
- && Objects.equals(reason, other.reason);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof GroupBlackListEntity)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- GroupBlackListEntity that = (GroupBlackListEntity) o;
- return isDataEquals(that);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), recordKey, topicName, groupName, reason);
- }
-
- @Override
- public GroupBlackListEntity clone() {
- GroupBlackListEntity copy = (GroupBlackListEntity) super.clone();
- return copy;
- }
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index b267530..8de8881 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -126,6 +126,10 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
return consumeEnable;
}
+ public boolean isEnableConsume() {
+ return consumeEnable.isEnable();
+ }
+
private void setConsumeEnable(boolean enableConsume) {
if (enableConsume) {
this.consumeEnable = EnableStatus.STATUS_ENABLE;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index 506100c..13d8c47 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -122,7 +122,7 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.allowedBrokerClientRate = 0;
this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
this.flowCtrlStatus = EnableStatus.STATUS_DISABLE;
- this.flowCtrlInfo = TServerConstants.BLANK_FILTER_ITEM_STR;
+ this.flowCtrlInfo = TServerConstants.BLANK_FLOWCTRL_RULES;
return this;
}
@@ -183,6 +183,10 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.flowCtrlStatus = flowCtrlStatus;
}
+ public boolean isEnableConsume() {
+ return consumeEnable.isEnable();
+ }
+
public EnableStatus getConsumeEnable() {
return consumeEnable;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupBlackListMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupBlackListMapper.java
deleted file mode 100644
index 1ff8043..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupBlackListMapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.metamanage.metastore.dao.mapper;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlackListEntity;
-
-
-
-
-public interface GroupBlackListMapper extends AbstractMapper {
-
- // about group blacklist api
- boolean addGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
-
- boolean updGroupBlackListConf(GroupBlackListEntity entity, ProcessResult result);
-
- boolean delGroupBlackListConf(String groupName, String topicName, ProcessResult result);
-
- boolean delGroupBlackListConf(String recordKey, ProcessResult result);
-
- List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName);
-
- Set<String> getNotAllowedTopicByGroupName(String groupName);
-
- List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName);
-
- List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity);
-
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
index 59d1edb..e94fdfd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
@@ -40,6 +40,8 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
boolean isTopicNameInUsed(String topicName);
+ boolean hasGroupConsumeCtrlConf(String groupName);
+
GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String recordKey);
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
deleted file mode 100644
index 2bdebe3..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupBlackListMapperImpl.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.metamanage.metastore.impl.bdbimpl;
-
-
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.persist.EntityCursor;
-import com.sleepycat.persist.EntityStore;
-import com.sleepycat.persist.PrimaryIndex;
-import com.sleepycat.persist.StoreConfig;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
-import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
-import org.apache.tubemq.server.common.exception.LoadMetaException;
-import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
-import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupBlackListEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.mapper.GroupBlackListMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-public class BdbGroupBlackListMapperImpl implements GroupBlackListMapper {
-
- private static final Logger logger =
- LoggerFactory.getLogger(BdbGroupBlackListMapperImpl.class);
- // consumer group black list store
- private EntityStore blackGroupStore;
- private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity> blackGroupIndex;
- private ConcurrentHashMap<String/* recordKey */, GroupBlackListEntity>
- groupBlackListCache = new ConcurrentHashMap<>();
- private ConcurrentHashMap<String/* topicName */, ConcurrentHashSet<String>>
- groupBlackListTopicCache = new ConcurrentHashMap<>();
- private ConcurrentHashMap<String/* groupName */, ConcurrentHashSet<String> >
- groupBlackListGroupCache = new ConcurrentHashMap<>();
-
-
-
- public BdbGroupBlackListMapperImpl(ReplicatedEnvironment repEnv, StoreConfig storeConfig) {
- blackGroupStore = new EntityStore(repEnv,
- TBDBStoreTables.BDB_BLACK_GROUP_STORE_NAME, storeConfig);
- blackGroupIndex =
- blackGroupStore.getPrimaryIndex(String.class, BdbBlackGroupEntity.class);
- }
-
- @Override
- public void close() {
- // clear cache data
- clearCacheData();
- // close store object
- if (blackGroupStore != null) {
- try {
- blackGroupStore.close();
- blackGroupStore = null;
- } catch (Throwable e) {
- logger.error("[BDB Impl] close blacklist configure failure ", e);
- }
- }
- }
-
- @Override
- public void loadConfig() throws LoadMetaException {
- long count = 0L;
- EntityCursor<BdbBlackGroupEntity> cursor = null;
- logger.info("[BDB Impl] load blacklist configure start...");
- try {
- // clear cache data
- clearCacheData();
- // read data from bdb
- cursor = blackGroupIndex.entities();
- for (BdbBlackGroupEntity bdbEntity : cursor) {
- if (bdbEntity == null) {
- logger.warn("[BDB Impl] found Null data while loading blacklist configure!");
- continue;
- }
- GroupBlackListEntity memEntity =
- new GroupBlackListEntity(bdbEntity);
- addOrUpdCacheRecord(memEntity);
- count++;
- }
- logger.info("[BDB Impl] total blacklist configure records are {}", count);
- } catch (Exception e) {
- logger.error("[BDB Impl] load blacklist configure failure ", e);
- throw new LoadMetaException(e.getMessage());
- } finally {
- if (cursor != null) {
- cursor.close();
- }
- }
- logger.info("[BDB Impl] load blacklist configure successfully...");
- }
-
- @Override
- public boolean addGroupBlackListConf(GroupBlackListEntity memEntity, ProcessResult result) {
- GroupBlackListEntity curEntity =
- groupBlackListCache.get(memEntity.getRecordKey());
- if (curEntity != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("The blacklist ").append(memEntity.getRecordKey())
- .append("'s configure already exists, please delete it first!")
- .toString());
- return result.isSuccess();
- }
- if (putGroupBlackListConfig2Bdb(memEntity, result)) {
- addOrUpdCacheRecord(memEntity);
- }
- return result.isSuccess();
- }
-
- @Override
- public boolean updGroupBlackListConf(GroupBlackListEntity memEntity, ProcessResult result) {
- GroupBlackListEntity curEntity =
- groupBlackListCache.get(memEntity.getRecordKey());
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("The blacklist ").append(memEntity.getRecordKey())
- .append("'s configure is not exists, please add record first!")
- .toString());
- return result.isSuccess();
- }
- if (curEntity.equals(memEntity)) {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("The blacklist ").append(memEntity.getRecordKey())
- .append("'s configure have not changed, please delete it first!")
- .toString());
- return result.isSuccess();
- }
- if (putGroupBlackListConfig2Bdb(memEntity, result)) {
- addOrUpdCacheRecord(memEntity);
- result.setRetData(curEntity);
- }
- return result.isSuccess();
- }
-
- @Override
- public List<GroupBlackListEntity> getGrpBlkLstConfByGroupName(String groupName) {
- ConcurrentHashSet<String> keySet =
- groupBlackListGroupCache.get(groupName);
- if (keySet == null || keySet.isEmpty()) {
- return Collections.emptyList();
- }
- GroupBlackListEntity entity;
- List<GroupBlackListEntity> result = new ArrayList<>();
- for (String recordKey : keySet) {
- entity = groupBlackListCache.get(recordKey);
- if (entity != null) {
- result.add(entity);
- }
- }
- return result;
- }
-
- @Override
- public Set<String> getNotAllowedTopicByGroupName(String groupName) {
- ConcurrentHashSet<String> keySet =
- groupBlackListGroupCache.get(groupName);
- if (keySet == null || keySet.isEmpty()) {
- return Collections.emptySet();
- }
- GroupBlackListEntity entity;
- Set<String> result = new HashSet<>();
- for (String recordKey : keySet) {
- entity = groupBlackListCache.get(recordKey);
- if (entity != null) {
- result.add(entity.getTopicName());
- }
- }
- return result;
- }
-
- @Override
- public boolean delGroupBlackListConf(String groupName,
- String topicName,
- ProcessResult result) {
- ConcurrentHashSet<String> keySet =
- new ConcurrentHashSet<>();
- // get need deleted record key
- if (groupName == null) {
- if (topicName == null) {
- result.setSuccResult(null);
- return true;
- } else {
- keySet = groupBlackListTopicCache.get(topicName);
- }
- } else {
- if (topicName == null) {
- keySet = groupBlackListGroupCache.get(groupName);
- } else {
- keySet.add(KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName));
- }
- }
- if (keySet == null || keySet.isEmpty()) {
- result.setSuccResult(null);
- return true;
- }
- for (String key : keySet) {
- if (!delGroupBlackListConf(key, result)) {
- return result.isSuccess();
- }
- result.clear();
- }
- result.setSuccResult(null);
- return true;
- }
-
- @Override
- public boolean delGroupBlackListConf(String recordKey, ProcessResult result) {
- GroupBlackListEntity curEntity =
- groupBlackListCache.get(recordKey);
- if (curEntity == null) {
- result.setSuccResult(null);
- return true;
- }
- delGroupBlackListConfigFromBdb(recordKey);
- delCacheRecord(recordKey);
- result.setSuccResult(curEntity);
- return true;
- }
-
- @Override
- public List<GroupBlackListEntity> getGrpBlkLstConfByTopicName(String topicName) {
- ConcurrentHashSet<String> keySet =
- groupBlackListTopicCache.get(topicName);
- if (keySet == null || keySet.isEmpty()) {
- return Collections.emptyList();
- }
- GroupBlackListEntity entity;
- List<GroupBlackListEntity> result = new ArrayList<>();
- for (String recordKey : keySet) {
- entity = groupBlackListCache.get(recordKey);
- if (entity != null) {
- result.add(entity);
- }
- }
- return result;
- }
-
- @Override
- public List<GroupBlackListEntity> getGroupBlackListConf(GroupBlackListEntity qryEntity) {
- List<GroupBlackListEntity> retEntitys = new ArrayList<>();
- if (qryEntity == null) {
- retEntitys.addAll(groupBlackListCache.values());
- } else {
- for (GroupBlackListEntity entity : groupBlackListCache.values()) {
- if (entity.isMatched(qryEntity)) {
- retEntitys.add(entity);
- }
- }
- }
- return retEntitys;
- }
-
- /**
- * Put blacklist configure info into bdb store
- *
- * @param memEntity need add record
- * @param result process result with old value
- * @return
- */
- private boolean putGroupBlackListConfig2Bdb(GroupBlackListEntity memEntity,
- ProcessResult result) {
- BdbBlackGroupEntity retData = null;
- BdbBlackGroupEntity bdbEntity = memEntity.buildBdbBlackListEntity();
- try {
- retData = blackGroupIndex.put(bdbEntity);
- } catch (Throwable e) {
- logger.error("[BDB Impl] put blacklist configure failure ", e);
- result.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(),
- new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
- .append("Put blacklist configure failure: ")
- .append(e.getMessage()).toString());
- return result.isSuccess();
- }
- result.setSuccResult(null);
- return result.isSuccess();
- }
-
- private boolean delGroupBlackListConfigFromBdb(String recordKey) {
- try {
- blackGroupIndex.delete(recordKey);
- } catch (Throwable e) {
- logger.error("[BDB Impl] delete blacklist configure failure ", e);
- return false;
- }
- return true;
- }
-
-
- private void delCacheRecord(String recordKey) {
- GroupBlackListEntity curEntity =
- groupBlackListCache.remove(recordKey);
- if (curEntity == null) {
- return;
- }
- // add topic index
- ConcurrentHashSet<String> keySet =
- groupBlackListTopicCache.get(curEntity.getTopicName());
- if (keySet != null) {
- keySet.remove(recordKey);
- if (keySet.isEmpty()) {
- groupBlackListTopicCache.remove(curEntity.getTopicName());
- }
- }
- // delete group index
- keySet = groupBlackListGroupCache.get(curEntity.getGroupName());
- if (keySet != null) {
- keySet.remove(recordKey);
- if (keySet.isEmpty()) {
- groupBlackListGroupCache.remove(curEntity.getGroupName());
- }
- }
- }
-
- private void addOrUpdCacheRecord(GroupBlackListEntity entity) {
- groupBlackListCache.put(entity.getRecordKey(), entity);
- // add topic index map
- ConcurrentHashSet<String> keySet =
- groupBlackListTopicCache.get(entity.getTopicName());
- if (keySet == null) {
- ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = groupBlackListTopicCache.putIfAbsent(entity.getTopicName(), tmpSet);
- if (keySet == null) {
- keySet = tmpSet;
- }
- }
- keySet.add(entity.getRecordKey());
- // add group index map
- keySet = groupBlackListGroupCache.get(entity.getGroupName());
- if (keySet == null) {
- ConcurrentHashSet<String> tmpSet = new ConcurrentHashSet<>();
- keySet = groupBlackListGroupCache.putIfAbsent(entity.getGroupName(), tmpSet);
- if (keySet == null) {
- keySet = tmpSet;
- }
- }
- keySet.add(entity.getRecordKey());
- }
-
- private void clearCacheData() {
- groupBlackListTopicCache.clear();
- groupBlackListGroupCache.clear();
- groupBlackListCache.clear();
- }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 7147a1a..8218753 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -179,6 +179,13 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
}
@Override
+ public boolean hasGroupConsumeCtrlConf(String groupName) {
+ ConcurrentHashSet<String> keySet =
+ grpConsumeCtrlGroupCache.get(groupName);
+ return (keySet != null && !keySet.isEmpty());
+ }
+
+ @Override
public boolean delGroupConsumeCtrlConf(String groupName,
String topicName,
ProcessResult result) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index f2714a9..acfd7d5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -40,8 +40,7 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerCon
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/**
@@ -56,9 +55,6 @@ import org.slf4j.LoggerFactory;
*/
public class WebBrokerConfHandler extends AbstractWebHandler {
- private static final Logger logger =
- LoggerFactory.getLogger(WebBrokerConfHandler.class);
-
/**
* Constructor
*
@@ -104,97 +100,97 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
*/
public StringBuilder adminQueryBrokerDefConfEntityInfo(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
BrokerConfEntity qryEntity = new BrokerConfEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<Integer> brokerIds = (Set<Integer>) result.retData1;
// get brokerIp info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPBROKERIP, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> brokerIpSet = (Set<String>) result.retData1;
// get brokerPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int brokerPort = (int) result.getRetData();
// get brokerTlsPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int brokerTlsPort = (int) result.getRetData();
// get brokerWebPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int brokerWebPort = (int) result.getRetData();
// get regionId field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.REGIONID,
false, TBaseConstants.META_VALUE_UNDEFINED,
TServerConstants.BROKER_REGION_ID_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int regionId = (int) result.getRetData();
// get groupId field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.GROUPID,
false, TBaseConstants.META_VALUE_UNDEFINED,
TServerConstants.BROKER_GROUP_ID_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int groupId = (int) result.getRetData();
// get and valid TopicPropGroup info
if (!WebParameterUtils.getTopicPropInfo(req,
null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
// get and valid TopicStatusId info
if (!WebParameterUtils.getTopicStatusParamValue(req,
false, TopicStatus.STATUS_TOPIC_UNDEFINED, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
TopicStatus topicStatus = (TopicStatus) result.getRetData();
// get topic info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> topicNameSet = (Set<String>) result.retData1;
// get isInclude info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.ISINCLUDE, false, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean isInclude = (Boolean) result.retData1;
// get withTopic info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.WITHTOPIC, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean withTopic = (Boolean) result.retData1;
// fill query entity fields
@@ -204,7 +200,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, qryEntity);
// build query result
int totalCnt = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (BrokerConfEntity entity : qryResult.values()) {
Map<String, TopicDeployEntity> topicConfEntityMap =
metaDataManager.getBrokerTopicConfEntitySet(entity.getBrokerId());
@@ -212,102 +208,144 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
continue;
}
if (totalCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- entity.toWebJsonStr(sBuilder, true, false);
- sBuilder = addTopicInfo(withTopic, sBuilder, topicConfEntityMap);
- sBuilder.append("}");
+ entity.toWebJsonStr(sBuffer, true, false);
+ sBuffer = addTopicInfo(withTopic, sBuffer, topicConfEntityMap);
+ sBuffer.append("}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
+ /**
+ * Add broker configure
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminAddBrokerConfInfo(HttpServletRequest req) {
+ return innAddOrUpdBrokerConfInfo(req, true);
+ }
+
+ /**
+ * update broker configure
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminUpdateBrokerConfInfo(HttpServletRequest req) {
+ return innAddOrUpdBrokerConfInfo(req, false);
+ }
/**
- * Add default config to a broker
+ * Add default config to brokers in batch
*
* @param req
* @return
*/
- public StringBuilder adminAddBrokerDefConfEntityInfo(HttpServletRequest req) {
+ public StringBuilder adminBatchAddBrokerConfInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdBrokerConfInfo(req, true);
+ }
+
+
+ private StringBuilder innAddOrUpdBrokerConfInfo(HttpServletRequest req,
+ boolean isAddOp) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
if (!WebParameterUtils.validReqAuthorizeInfo(req,
WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get cluster default setting info
ClusterSettingEntity defClusterSetting =
metaDataManager.getClusterDefSetting(false);
- // get brokerIp and brokerId field
- if (!getBrokerIpAndIdParamValue(req, sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Tuple2<Integer, String> brokerIdAndIpTuple =
- (Tuple2<Integer, String>) result.getRetData();
// get brokerPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
- false, defClusterSetting.getBrokerPort(), 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ false, (isAddOp ? defClusterSetting.getBrokerPort()
+ : TBaseConstants.META_VALUE_UNDEFINED), 1, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int brokerPort = (int) result.getRetData();
// get brokerTlsPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
- false, defClusterSetting.getBrokerTLSPort(), 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ false, (isAddOp ? defClusterSetting.getBrokerTLSPort()
+ : TBaseConstants.META_VALUE_UNDEFINED), 1, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int brokerTlsPort = (int) result.getRetData();
// get brokerWebPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
- false, defClusterSetting.getBrokerWebPort(), 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ false, (isAddOp ? defClusterSetting.getBrokerWebPort()
+ : TBaseConstants.META_VALUE_UNDEFINED), 1, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int brokerWebPort = (int) result.getRetData();
// get regionId field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.REGIONID,
- false, TServerConstants.BROKER_REGION_ID_DEF,
+ false, (isAddOp ? TServerConstants.BROKER_REGION_ID_DEF
+ : TBaseConstants.META_VALUE_UNDEFINED),
TServerConstants.BROKER_REGION_ID_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int regionId = (int) result.getRetData();
// get groupId field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.GROUPID,
- false, TServerConstants.BROKER_GROUP_ID_DEF,
+ false, (isAddOp ? TServerConstants.BROKER_GROUP_ID_DEF
+ : TBaseConstants.META_VALUE_UNDEFINED),
TServerConstants.BROKER_GROUP_ID_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
int groupId = (int) result.getRetData();
// get and valid TopicPropGroup info
if (!WebParameterUtils.getTopicPropInfo(req,
- defClusterSetting.getClsDefTopicProps(), result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ (isAddOp ? defClusterSetting.getClsDefTopicProps() : null), result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
- // manageStatusId
- ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
// add record and process result
List<BrokerProcessResult> retInfo = new ArrayList<>();
- BrokerProcessResult processResult =
- metaDataManager.addBrokerConfig(opInfoEntity, brokerIdAndIpTuple.getF0(),
- brokerIdAndIpTuple.getF1(), brokerPort, brokerTlsPort, brokerWebPort,
- regionId, groupId, manageStatus, brokerProps, sBuilder, result);
- retInfo.add(processResult);
- return buildRetInfo(retInfo, sBuilder);
+ if (isAddOp) {
+ // get brokerIp and brokerId field
+ if (!getBrokerIpAndIdParamValue(req, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Tuple2<Integer, String> brokerIdAndIpTuple =
+ (Tuple2<Integer, String>) result.getRetData();
+ retInfo.add(metaDataManager.addOrUpdBrokerConfig(isAddOp, opEntity,
+ brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1(), brokerPort,
+ brokerTlsPort, brokerWebPort, regionId, groupId,
+ ManageStatus.STATUS_MANAGE_APPLY, brokerProps, sBuffer, result));
+ } else {
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, true, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ for (Integer brokerId : brokerIdSet) {
+ retInfo.add(metaDataManager.addOrUpdBrokerConfig(isAddOp, opEntity,
+ brokerId, "", brokerPort, brokerTlsPort, brokerWebPort,
+ regionId, groupId, null, brokerProps, sBuffer, result));
+ }
+ }
+ return buildRetInfo(retInfo, sBuffer);
}
/**
@@ -315,143 +353,39 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminBatchAddBrokerDefConfEntityInfo(HttpServletRequest req) {
+ private StringBuilder innBatchAddOrUpdBrokerConfInfo(HttpServletRequest req,
+ boolean isAddOp) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
if (!WebParameterUtils.validReqAuthorizeInfo(req,
WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- BaseEntity defOpInfoEntity = (BaseEntity) result.getRetData();
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
// check and get brokerJsonSet info
- if (!getBrokerJsonSetInfo(req, true,
- defOpInfoEntity, null, sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!getBrokerJsonSetInfo(req, isAddOp, defOpEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Map<Integer, BrokerConfEntity> addedRecordMap =
(HashMap<Integer, BrokerConfEntity>) result.getRetData();
// add record and process result
List<BrokerProcessResult> retInfo = new ArrayList<>();
for (BrokerConfEntity brokerEntity : addedRecordMap.values()) {
- BrokerProcessResult processResult =
- metaDataManager.addBrokerConfig(brokerEntity, sBuilder, result);
- retInfo.add(processResult);
+ retInfo.add(metaDataManager.addOrUpdBrokerConfig(
+ isAddOp, brokerEntity, sBuffer, result));
}
- return buildRetInfo(retInfo, sBuilder);
+ return buildRetInfo(retInfo, sBuffer);
}
- /**
- * Update broker default config.
- * The current record will be checked firstly.
- * The update will be performed only when there are changes.
- *
- * @param req
- * @return
- * @throws Throwable
- */
- public StringBuilder adminUpdateBrokerConf(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
- // check and get brokerId field
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.COMPSBROKERID, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
- // get brokerPort field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int brokerPort = (int) result.getRetData();
- // get brokerTlsPort field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int brokerTlsPort = (int) result.getRetData();
- // get brokerWebPort field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int brokerWebPort = (int) result.getRetData();
- // get regionId field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.REGIONID,
- false, TBaseConstants.META_VALUE_UNDEFINED,
- TServerConstants.BROKER_REGION_ID_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int regionId = (int) result.getRetData();
- // get groupId field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.GROUPID,
- false, TBaseConstants.META_VALUE_UNDEFINED,
- TServerConstants.BROKER_GROUP_ID_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int groupId = (int) result.getRetData();
- // get and valid TopicPropGroup info
- if (!WebParameterUtils.getTopicPropInfo(req, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
- // manageStatusId
- ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
- // add record and process result
- List<BrokerProcessResult> retInfo = new ArrayList<>();
- BrokerConfEntity newEntity;
- for (Integer brokerId : brokerIdSet) {
- BrokerConfEntity curEntity =
- metaDataManager.getBrokerConfByBrokerId(brokerId);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- retInfo.add(new BrokerProcessResult(brokerId, "", result));
- continue;
- }
- newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(opInfoEntity);
- if (!newEntity.updModifyInfo(opInfoEntity.getDataVerId(), brokerPort, brokerTlsPort,
- brokerWebPort, regionId, groupId, null, brokerProps)) {
- result.setFailResult(DataOpErrCode.DERR_SUCCESS_UNCHANGED.getCode(),
- DataOpErrCode.DERR_SUCCESS_UNCHANGED.getDescription());
- retInfo.add(new BrokerProcessResult(brokerId, curEntity.getBrokerIp(), result));
- continue;
- }
- metaDataManager.modBrokerConfig(newEntity, sBuilder, result);
- retInfo.add(new BrokerProcessResult(brokerId, curEntity.getBrokerIp(), result));
- }
- return buildRetInfo(retInfo, sBuilder);
- }
/**
* Delete broker config
@@ -461,44 +395,44 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
*/
public StringBuilder adminDeleteBrokerConfEntityInfo(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
if (!WebParameterUtils.validReqAuthorizeInfo(req,
WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
// get isReservedData info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.ISRESERVEDDATA, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean isReservedData = (Boolean) result.retData1;
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<Integer> brokerIds = (Set<Integer>) result.retData1;
if (brokerIds.isEmpty()) {
- WebParameterUtils.buildFailResult(sBuilder,
+ WebParameterUtils.buildFailResult(sBuffer,
"Illegal value: Null value of brokerId parameter");
- return sBuilder;
+ return sBuffer;
}
Map<Integer, BrokerConfEntity> qryResult =
metaDataManager.getBrokerConfInfo(brokerIds, null, null);
if (qryResult.isEmpty()) {
- WebParameterUtils.buildFailResult(sBuilder,
+ WebParameterUtils.buildFailResult(sBuffer,
"Illegal value: not found broker configure by brokerId value");
- return sBuilder;
+ return sBuffer;
}
// check broker configure status
List<BrokerProcessResult> retInfo = new ArrayList<>();
@@ -516,7 +450,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
if (WebParameterUtils.checkBrokerInOfflining(entity.getBrokerId(),
entity.getManageStatus().getCode(), metaDataManager)) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("Illegal value: the broker is processing offline event by brokerId=")
+ sBuffer.append("Illegal value: the broker is processing offline event by brokerId=")
.append(entity.getBrokerId()).append(", please wait and try later!").toString());
retInfo.add(new BrokerProcessResult(
entity.getBrokerId(), entity.getBrokerIp(), result));
@@ -531,7 +465,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
if (entry.getValue().getTopicProps().isAcceptPublish()
|| entry.getValue().getTopicProps().isAcceptSubscribe()) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("The topic ").append(entry.getKey())
+ sBuffer.append("The topic ").append(entry.getKey())
.append("'s acceptPublish and acceptSubscribe parameters")
.append(" must be false in broker=")
.append(entity.getBrokerId())
@@ -544,7 +478,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
} else {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuilder.append("Topic configure of broker by brokerId=")
+ sBuffer.append("Topic configure of broker by brokerId=")
.append(entity.getBrokerId())
.append(" not deleted, please delete broker's topic configure first!").toString());
retInfo.add(new BrokerProcessResult(
@@ -556,7 +490,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
}
if (needDelMap.isEmpty()) {
- return buildRetInfo(retInfo, sBuilder);
+ return buildRetInfo(retInfo, sBuffer);
}
// do delete operation
for (BrokerConfEntity entry : needDelMap.values()) {
@@ -568,15 +502,15 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
metaDataManager.getBrokerTopicConfEntitySet(entry.getBrokerId());
if (brokerTopicConfMap != null) {
metaDataManager.delBrokerTopicConfig(opInfoEntity.getModifyUser(),
- entry.getBrokerId(), sBuilder, result);
+ entry.getBrokerId(), sBuffer, result);
}
}
metaDataManager.confDelBrokerConfig(opInfoEntity.getModifyUser(),
- entry.getBrokerId(), sBuilder, result);
+ entry.getBrokerId(), sBuffer, result);
retInfo.add(new BrokerProcessResult(
entry.getBrokerId(), entry.getBrokerIp(), result));
}
- return buildRetInfo(retInfo, sBuilder);
+ return buildRetInfo(retInfo, sBuffer);
}
@@ -659,11 +593,10 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
private boolean getBrokerJsonSetInfo(HttpServletRequest req, boolean isAddOp,
- BaseEntity defOpInfoEntity,
- List<Map<String, String>> defValue,
- StringBuilder sBuilder, ProcessResult result) {
+ BaseEntity defOpEntity, StringBuilder sBuffer,
+ ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.BROKERJSONSET, true, defValue, result)) {
+ WebFieldDef.BROKERJSONSET, true, null, result)) {
return result.success;
}
List<Map<String, String>> brokerJsonArray =
@@ -677,70 +610,75 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
Map<String, String> brokerObject = brokerJsonArray.get(j);
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(brokerObject,
- isAddOp, defOpInfoEntity, result)) {
+ isAddOp, defOpEntity, result)) {
return result.isSuccess();
}
BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
// get brokerIp and brokerId field
- if (!getBrokerIpAndIdParamValue(brokerObject, sBuilder, result)) {
+ if (!getBrokerIpAndIdParamValue(brokerObject, sBuffer, result)) {
return result.isSuccess();
}
Tuple2<Integer, String> brokerIdAndIpTuple =
(Tuple2<Integer, String>) result.getRetData();
// get brokerPort field
if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.BROKERPORT,
- false, defClusterSetting.getBrokerPort(), 1, result)) {
+ false, (isAddOp ? defClusterSetting.getBrokerPort()
+ : TBaseConstants.META_VALUE_UNDEFINED), 1, result)) {
return result.isSuccess();
}
int brokerPort = (int) result.getRetData();
// get brokerTlsPort field
if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.BROKERTLSPORT,
- false, defClusterSetting.getBrokerTLSPort(), 1, result)) {
+ false, (isAddOp ? defClusterSetting.getBrokerTLSPort()
+ : TBaseConstants.META_VALUE_UNDEFINED), 1, result)) {
return result.isSuccess();
}
int brokerTlsPort = (int) result.getRetData();
// get brokerWebPort field
if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.BROKERWEBPORT,
- false, defClusterSetting.getBrokerWebPort(), 1, result)) {
+ false, (isAddOp ? defClusterSetting.getBrokerWebPort()
+ : TBaseConstants.META_VALUE_UNDEFINED), 1, result)) {
return result.isSuccess();
}
int brokerWebPort = (int) result.getRetData();
// get regionId field
if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.REGIONID,
- false, TServerConstants.BROKER_REGION_ID_DEF,
+ false, (isAddOp ? TServerConstants.BROKER_REGION_ID_DEF
+ : TBaseConstants.META_VALUE_UNDEFINED),
TServerConstants.BROKER_REGION_ID_MIN, result)) {
return result.isSuccess();
}
int regionId = (int) result.getRetData();
// get groupId field
if (!WebParameterUtils.getIntParamValue(brokerObject, WebFieldDef.GROUPID,
- false, TServerConstants.BROKER_GROUP_ID_DEF,
+ false, (isAddOp ? TServerConstants.BROKER_GROUP_ID_DEF
+ : TBaseConstants.META_VALUE_UNDEFINED),
TServerConstants.BROKER_GROUP_ID_MIN, result)) {
return result.isSuccess();
}
int groupId = (int) result.getRetData();
// get and valid TopicPropGroup info
if (!WebParameterUtils.getTopicPropInfo(brokerObject,
- defClusterSetting.getClsDefTopicProps(), result)) {
+ (isAddOp ? defClusterSetting.getClsDefTopicProps() : null), result)) {
return result.isSuccess();
}
TopicPropGroup brokerProps = (TopicPropGroup) result.getRetData();
// manageStatusId
ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_APPLY;
BrokerConfEntity entity =
- new BrokerConfEntity(itemOpEntity);
- entity.setBrokerIdAndIp(brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1());
+ new BrokerConfEntity(itemOpEntity,
+ brokerIdAndIpTuple.getF0(), brokerIdAndIpTuple.getF1());
entity.updModifyInfo(itemOpEntity.getDataVerId(), brokerPort, brokerTlsPort,
brokerWebPort, regionId, groupId, manageStatus, brokerProps);
addedRecordMap.put(entity.getBrokerId(), entity);
}
// check result
if (addedRecordMap.isEmpty()) {
- result.setFailResult(sBuilder
+ result.setFailResult(sBuffer
.append("Not found record in ")
.append(WebFieldDef.BROKERJSONSET.name)
.append(" parameter!").toString());
- sBuilder.delete(0, sBuilder.length());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
result.setSuccResult(addedRecordMap);
@@ -766,7 +704,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
}
private boolean getBrokerIpAndIdParamValue(HttpServletRequest req,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
// get brokerIp
if (!WebParameterUtils.getStringParamValue(req,
@@ -780,40 +718,11 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return result.success;
}
int brokerId = (int) result.getRetData();
- if (brokerId <= 0) {
- try {
- brokerId = abs(AddressUtils.ipToInt(brokerIp));
- } catch (Exception e) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
- sBuilder.append("Get ").append(WebFieldDef.BROKERID.name)
- .append(" by ").append(WebFieldDef.BROKERIP.name)
- .append(" error !, exception is :")
- .append(e.toString()).toString());
- return result.isSuccess();
- }
- }
- BrokerConfEntity curEntity = metaDataManager.getBrokerConfByBrokerIp(brokerIp);
- if (curEntity != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- sBuilder.append("Duplicated broker configure record, ")
- .append("query by ").append(WebFieldDef.BROKERIP.name)
- .append(" : ").append(brokerIp).toString());
- return result.isSuccess();
- }
- curEntity = metaDataManager.getBrokerConfByBrokerId(brokerId);
- if (curEntity != null) {
- result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- sBuilder.append("Duplicated broker configure record, ")
- .append("query by ").append(WebFieldDef.BROKERID.name)
- .append(" : ").append(brokerId).toString());
- return result.isSuccess();
- }
- result.setSuccResult(new Tuple2<>(brokerId, brokerIp));
- return result.isSuccess();
+ return validBrokerIdAndIpValues(brokerId, brokerIp, sBuffer, result);
}
private boolean getBrokerIpAndIdParamValue(Map<String, String> keyValueMap,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
// get brokerIp
if (!WebParameterUtils.getStringParamValue(keyValueMap,
@@ -827,40 +736,46 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return result.success;
}
int brokerId = (int) result.getRetData();
+ return validBrokerIdAndIpValues(brokerId, brokerIp, sBuffer, result);
+ }
+
+
+ private boolean validBrokerIdAndIpValues(int brokerId, String brokerIp,
+ StringBuilder sBuffer,
+ ProcessResult result) {
if (brokerId <= 0) {
try {
brokerId = abs(AddressUtils.ipToInt(brokerIp));
} catch (Exception e) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
- sBuilder.append("Get ").append(WebFieldDef.BROKERID.name)
+ sBuffer.append("Get ").append(WebFieldDef.BROKERID.name)
.append(" by ").append(WebFieldDef.BROKERIP.name)
.append(" error !, exception is :")
.append(e.toString()).toString());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
}
BrokerConfEntity curEntity = metaDataManager.getBrokerConfByBrokerIp(brokerIp);
if (curEntity != null) {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- sBuilder.append("Duplicated broker configure record, ")
- .append("query by ").append(WebFieldDef.BROKERIP.name)
+ sBuffer.append("Duplicated broker configure record, query by ")
+ .append(WebFieldDef.BROKERIP.name)
.append(" : ").append(brokerIp).toString());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
curEntity = metaDataManager.getBrokerConfByBrokerId(brokerId);
if (curEntity != null) {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
- sBuilder.append("Duplicated broker configure record, ")
- .append("query by ").append(WebFieldDef.BROKERID.name)
- .append(" : ").append(brokerId).toString());
+ sBuffer.append("Duplicated broker configure record, query by ")
+ .append(WebFieldDef.BROKERID.name).append(" : ")
+ .append(brokerId).toString());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
result.setSuccResult(new Tuple2<>(brokerId, brokerIp));
return result.isSuccess();
}
-
-
-
-
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 03cdb62..d896ab2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -18,41 +18,25 @@
package org.apache.tubemq.server.master.web.handler;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
-import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
-import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
- private static final Logger logger =
- LoggerFactory.getLogger(WebGroupConsumeCtrlHandler.class);
-
-
public WebGroupConsumeCtrlHandler(TMaster master) {
super(master);
}
@@ -60,47 +44,19 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
@Override
public void registerWebApiMethod() {
// register query method
- registerQueryWebMethod("admin_query_black_consumer_group_info",
- "adminQueryBlackGroupInfo");
- registerQueryWebMethod("admin_query_allowed_consumer_group_info",
- "adminQueryConsumerGroupInfo");
- registerQueryWebMethod("admin_query_group_filtercond_info",
- "adminQueryGroupFilterCondInfo");
- registerQueryWebMethod("admin_query_consume_group_setting",
- "adminQueryConsumeGroupSetting");
+ registerQueryWebMethod("admin_query_group_csmctrl_info",
+ "adminQueryGroupConsumeCtrlInfo");
// register modify method
- registerModifyWebMethod("admin_add_black_consumergroup_info",
- "adminAddBlackGroupInfo");
- registerModifyWebMethod("admin_bath_add_black_consumergroup_info",
- "adminBatchAddBlackGroupInfo");
- registerModifyWebMethod("admin_delete_black_consumergroup_info",
- "adminDeleteBlackGroupInfo");
- registerModifyWebMethod("admin_add_authorized_consumergroup_info",
- "adminAddConsumerGroupInfo");
- registerModifyWebMethod("admin_delete_allowed_consumer_group_info",
- "adminDeleteConsumerGroupInfo");
- registerModifyWebMethod("admin_bath_add_authorized_consumergroup_info",
- "adminBatchAddConsumerGroupInfo");
- registerModifyWebMethod("admin_add_group_filtercond_info",
- "adminAddGroupFilterCondInfo");
- registerModifyWebMethod("admin_bath_add_group_filtercond_info",
- "adminBatchAddGroupFilterCondInfo");
- registerModifyWebMethod("admin_mod_group_filtercond_info",
- "adminModGroupFilterCondInfo");
- registerModifyWebMethod("admin_bath_mod_group_filtercond_info",
- "adminBatchModGroupFilterCondInfo");
- registerModifyWebMethod("admin_del_group_filtercond_info",
- "adminDeleteGroupFilterCondInfo");
- registerModifyWebMethod("admin_add_consume_group_setting",
- "adminAddConsumeGroupSettingInfo");
- registerModifyWebMethod("admin_bath_add_consume_group_setting",
- "adminBatchAddConsumeGroupSetting");
- registerModifyWebMethod("admin_upd_consume_group_setting",
- "adminUpdConsumeGroupSetting");
- registerModifyWebMethod("admin_del_consume_group_setting",
- "adminDeleteConsumeGroupSetting");
- registerModifyWebMethod("admin_rebalance_group_allocate",
- "adminRebalanceGroupAllocateInfo");
+ registerModifyWebMethod("admin_add_group_csmctrl_info",
+ "adminAddGroupConsumeCtrlInfo");
+ registerModifyWebMethod("admin_batch_add_group_csmctrl_info",
+ "adminBatchAddGroupConsumeCtrlInfo");
+ registerModifyWebMethod("admin_update_group_csmctrl_info",
+ "adminModGroupConsumeCtrlInfo");
+ registerModifyWebMethod("admin_batch_update_group_csmctrl_info",
+ "adminBatchModGroupConsumeCtrlInfo");
+ registerModifyWebMethod("admin_delete_group_csmctrl_info",
+ "adminDelGroupConsumeCtrlInfo");
}
@@ -112,46 +68,46 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
*/
public StringBuilder adminQueryGroupConsumeCtrlInfo(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// build query entity
GroupConsumeCtrlEntity qryEntity = new GroupConsumeCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// get group list
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> groupSet = (Set<String>) result.retData1;
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> topicNameSet = (Set<String>) result.retData1;
// get consumeEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.CONSUMEENABLE, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean consumeEnable = (Boolean) result.retData1;
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.FILTERENABLE, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean filterEnable = (Boolean) result.retData1;
// get filterConds info
if (!WebParameterUtils.getFilterCondSet(req, false, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> filterCondSet = (Set<String>) result.retData1;
qryEntity.updModifyInfo(qryEntity.getDataVerId(),
@@ -160,7 +116,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
metaDataManager.getGroupConsumeCtrlConf(groupSet, topicNameSet);
// build return result
int totalCnt = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (List<GroupConsumeCtrlEntity> consumeCtrlEntityList : qryResultSet.values()) {
if (consumeCtrlEntityList == null || consumeCtrlEntityList.isEmpty()) {
continue;
@@ -172,13 +128,13 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
continue;
}
if (totalCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- entity.toWebJsonStr(sBuilder, true, true);
+ entity.toWebJsonStr(sBuffer, true, true);
}
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
/**
@@ -188,121 +144,17 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder adminAddGroupConsumeCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
- // check and get topicName field
- if (!WebParameterUtils.getAndValidTopicNameInfo(req,
- metaDataManager, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Set<String> topicNameSet = (Set<String>) result.retData1;
- // get groupName field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Set<String> groupNameSet = (Set<String>) result.retData1;
- // get consumeEnable info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.CONSUMEENABLE, false, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Boolean consumeEnable = (Boolean) result.retData1;
- // get disableReason list
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.REASON, false, "", result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- String disableRsn = (String) result.retData1;
- // get filterEnable info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.FILTERENABLE, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Boolean filterEnable = (Boolean) result.retData1;
- // get filterConds info
- if (!WebParameterUtils.getFilterCondString(req, false, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- String filterCondStr = (String) result.retData1;
- // add group resource record
- GroupProcessResult csmProcessResult;
- List<GroupProcessResult> retInfo = new ArrayList<>();
- for (String groupName : groupNameSet) {
- for (String topicName : topicNameSet) {
- csmProcessResult =
- metaDataManager.addGroupConsumeCtrlInfo(opInfoEntity, groupName,
- topicName, consumeEnable, disableRsn,
- filterEnable, filterCondStr, sBuilder, result);
- retInfo.add(csmProcessResult);
- }
- }
- buildRetInfo(retInfo, sBuilder);
- return sBuilder;
+ return innAddOrUpdGroupConsumeCtrlInfo(req, true);
}
/**
- * Batch add group consume control info
+ * Add group consume control info in batch
*
* @param req
* @return
*/
public StringBuilder adminBatchAddGroupConsumeCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- BaseEntity defOpEntity = (BaseEntity) result.getRetData();
- // check and get groupCsmJsonSet data
- if (!getGroupConsumeJsonSetInfo(req, true,
- defOpEntity, null, sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Map<String, GroupProcessResult> batchAddInfoMap =
- (Map<String, GroupProcessResult>) result.getRetData();
- // add group resource record
- GroupConsumeCtrlEntity addEntity;
- GroupProcessResult addResult;
- List<GroupProcessResult> retInfo = new ArrayList<>();
- for (GroupProcessResult addInfo : batchAddInfoMap.values()) {
- if (!addInfo.isSuccess()) {
- retInfo.add(addInfo);
- continue;
- }
- addEntity = (GroupConsumeCtrlEntity) addInfo.getRetData();
- addResult = metaDataManager.addGroupConsumeCtrlInfo(addEntity, sBuilder, result);
- retInfo.add(addResult);
- }
- buildRetInfo(retInfo, sBuilder);
- return sBuilder;
+ return innBatchAddOrUpdGroupConsumeCtrlInfo(req, true);
}
/**
@@ -312,388 +164,259 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder adminModGroupConsumeCtrlInfo(HttpServletRequest req) {
+ return innAddOrUpdGroupConsumeCtrlInfo(req, false);
+ }
+
+ /**
+ * Modify group consume control info in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchModGroupConsumeCtrlInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdGroupConsumeCtrlInfo(req, false);
+ }
+
+ /**
+ * Delete group consume configure info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDelGroupConsumeCtrlInfo(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
if (!WebParameterUtils.validReqAuthorizeInfo(req,
WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get operation info
if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get groupName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.retData1;
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // execute delete operation
+ List<GroupProcessResult> retInfo =
+ metaDataManager.delGroupConsumeCtrlConf(opEntity.getModifyUser(),
+ groupNameSet, topicNameSet, sBuffer, result);
+ buildRetInfo(retInfo, sBuffer);
+ return sBuffer;
+ }
+
+ private StringBuilder innAddOrUpdGroupConsumeCtrlInfo(HttpServletRequest req,
+ boolean isAddOp) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getAndValidTopicNameInfo(req,
metaDataManager, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> topicNameSet = (Set<String>) result.retData1;
// get groupName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Set<String> groupNameSet = (Set<String>) result.retData1;
// get consumeEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.CONSUMEENABLE, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebFieldDef.CONSUMEENABLE, false, (isAddOp ? true : null), result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean consumeEnable = (Boolean) result.retData1;
// get disableReason list
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.REASON, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebFieldDef.REASON, false, (isAddOp ? "" : null), result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
String disableRsn = (String) result.retData1;
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.FILTERENABLE, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebFieldDef.FILTERENABLE, false, (isAddOp ? false : null), result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
Boolean filterEnable = (Boolean) result.retData1;
// get filterConds info
- if (!WebParameterUtils.getFilterCondString(req, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!WebParameterUtils.getFilterCondString(req, false, isAddOp, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
String filterCondStr = (String) result.retData1;
- // modify group resource record
- GroupProcessResult csmProcessResult;
+ // add group resource record
List<GroupProcessResult> retInfo = new ArrayList<>();
for (String groupName : groupNameSet) {
for (String topicName : topicNameSet) {
- csmProcessResult =
- metaDataManager.modGroupConsumeCtrlInfo(opEntity, groupName,
- topicName, consumeEnable, disableRsn,
- filterEnable, filterCondStr, sBuilder, result);
- retInfo.add(csmProcessResult);
+ retInfo.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(isAddOp,
+ opInfoEntity, groupName, topicName, consumeEnable, disableRsn,
+ filterEnable, filterCondStr, sBuffer, result));
}
}
- buildRetInfo(retInfo, sBuilder);
- return sBuilder;
+ return buildRetInfo(retInfo, sBuffer);
}
- /**
- * Modify group consume control info in batch
- *
- * @param req
- * @return
- */
- public StringBuilder adminBatchModGroupFilterCondInfo(HttpServletRequest req) {
+ private StringBuilder innBatchAddOrUpdGroupConsumeCtrlInfo(HttpServletRequest req,
+ boolean isAddOp) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
if (!WebParameterUtils.validReqAuthorizeInfo(req,
WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- BaseEntity opEntity = (BaseEntity) result.getRetData();
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
// check and get groupCsmJsonSet data
- if (!getGroupConsumeJsonSetInfo(req, true,
- opEntity, null, sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!getGroupConsumeJsonSetInfo(req, isAddOp, defOpEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- Map<String, GroupProcessResult> batchModInfoMap =
- (Map<String, GroupProcessResult>) result.getRetData();
+ Map<String, GroupConsumeCtrlEntity> batchAddInfoMap =
+ (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
// add group resource record
- GroupConsumeCtrlEntity modEntity;
- GroupProcessResult modResult;
+ GroupProcessResult addResult;
List<GroupProcessResult> retInfo = new ArrayList<>();
- for (GroupProcessResult addInfo : batchModInfoMap.values()) {
- if (!addInfo.isSuccess()) {
- retInfo.add(addInfo);
- continue;
- }
- modEntity = (GroupConsumeCtrlEntity) addInfo.getRetData();
- modResult = metaDataManager.modGroupConsumeCtrlInfo(modEntity, sBuilder, result);
- retInfo.add(modResult);
+ for (GroupConsumeCtrlEntity ctrlEntity : batchAddInfoMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(
+ isAddOp, ctrlEntity, sBuffer, result));
}
- buildRetInfo(retInfo, sBuilder);
- return sBuilder;
+ buildRetInfo(retInfo, sBuffer);
+ return sBuffer;
}
- /**
- * Delete group consume configure info
- *
- * @param req
- * @return
- */
- public StringBuilder adminDeleteGroupFilterCondInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // check and get topicName field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Set<String> topicNameSet = (Set<String>) result.retData1;
- // get groupName field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, false, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Set<String> groupNameSet = (Set<String>) result.retData1;
- // execute delete operation
- List<GroupProcessResult> retInfo =
- metaDataManager.delGroupConsumeCtrlConf(opEntity.getModifyUser(),
- groupNameSet, topicNameSet, sBuilder, result);
- buildRetInfo(retInfo, sBuilder);
- return sBuilder;
- }
-
-
private StringBuilder buildRetInfo(List<GroupProcessResult> retInfo,
- StringBuilder sBuilder) {
+ StringBuilder sBuffer) {
int totalCnt = 0;
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (GroupProcessResult result : retInfo) {
if (totalCnt++ > 0) {
- sBuilder.append(",");
+ sBuffer.append(",");
}
- sBuilder.append("{\"groupName\":\"").append(result.getGroupName()).append("\"")
- .append("{\"topicName\":\"").append(result.getTopicName()).append("\"")
+ sBuffer.append("{\"groupName\":\"").append(result.getGroupName()).append("\"")
+ .append(",\"topicName\":\"").append(result.getTopicName()).append("\"")
.append(",\"success\":").append(result.isSuccess())
.append(",\"errCode\":").append(result.getErrCode())
.append(",\"errInfo\":\"").append(result.getErrInfo()).append("\"}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
- return sBuilder;
- }
-
-
-
- /**
- * Re-balance group allocation info
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminRebalanceGroupAllocateInfo(HttpServletRequest req) {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- int reJoinWait =
- WebParameterUtils.validIntDataParameter("reJoinWait",
- req.getParameter("reJoinWait"),
- false, 0, 0);
- Set<String> batchOpConsumerIds = new HashSet<>();
- String inputConsumerId = req.getParameter("consumerId");
- if (TStringUtils.isNotBlank(inputConsumerId)) {
- inputConsumerId = inputConsumerId.trim();
- String[] strInputConsumerIds =
- inputConsumerId.split(TokenConstants.ARRAY_SEP);
- for (int i = 0; i < strInputConsumerIds.length; i++) {
- if (TStringUtils.isBlank(strInputConsumerIds[i])) {
- continue;
- }
- String consumerId = strInputConsumerIds[i].trim();
- if (consumerId.length() > TServerConstants.CFG_CONSUMER_CLIENTID_MAX_LENGTH) {
- throw new Exception(sBuilder.append("The max length of ")
- .append(consumerId)
- .append(" in consumerId parameter over ")
- .append(TServerConstants.CFG_CONSUMER_CLIENTID_MAX_LENGTH)
- .append(" characters").toString());
- }
- if (!consumerId.matches(TBaseConstants.META_TMP_CONSUMERID_VALUE)) {
- throw new Exception(sBuilder.append("The value of ").append(consumerId)
- .append("in consumerId parameter must begin with a letter, " +
- "can only contain characters,numbers,dot,scores,and underscores").toString());
- }
- if (!batchOpConsumerIds.contains(consumerId)) {
- batchOpConsumerIds.add(consumerId);
- }
- }
- }
- if (batchOpConsumerIds.isEmpty()) {
- throw new Exception("Null value of required consumerId parameter");
- }
- ConsumerInfoHolder consumerInfoHolder =
- master.getConsumerHolder();
- ConsumerBandInfo consumerBandInfo =
- consumerInfoHolder.getConsumerBandInfo(groupName);
- if (consumerBandInfo == null) {
- return sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"The group(")
- .append(groupName).append(") not online! \"}");
- } else {
- Map<String, NodeRebInfo> nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
- for (String consumerId : batchOpConsumerIds) {
- if (nodeRebInfoMap.containsKey(consumerId)) {
- return sBuilder
- .append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Duplicated set for consumerId(")
- .append(consumerId).append(") in group(")
- .append(groupName).append(")! \"}");
- }
- }
- logger.info(sBuilder.append("[Re-balance] Add rebalance consumer: group=")
- .append(groupName).append(", consumerIds=")
- .append(batchOpConsumerIds.toString())
- .append(", reJoinWait=").append(reJoinWait)
- .append(", creator=").append(modifyUser).toString());
- sBuilder.delete(0, sBuilder.length());
- consumerInfoHolder.addRebConsumerInfo(groupName, batchOpConsumerIds, reJoinWait);
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- }
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
-
-
private boolean getGroupConsumeJsonSetInfo(HttpServletRequest req, boolean isAddOp,
- BaseEntity defOpEntity,
- List<Map<String, String>> defValue,
- StringBuilder sBuilder,
+ BaseEntity defOpEntity, StringBuilder sBuffer,
ProcessResult result) {
+ // get groupCsmJsonSet field info
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.GROUPCSMJSONSET, true, defValue, result)) {
+ WebFieldDef.GROUPCSMJSONSET, true, null, result)) {
return result.success;
}
List<Map<String, String>> filterJsonArray =
(List<Map<String, String>>) result.retData1;
+ // parse groupCsmJsonSet field info
+ GroupConsumeCtrlEntity itemConf;
+ Map<String, String> itemsMap;
+ HashMap<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
Set<String> configuredTopicSet =
metaDataManager.getTotalConfiguredTopicNames();
- HashMap<String, GroupProcessResult> csmProcessMap =
- new HashMap<>();
for (int j = 0; j < filterJsonArray.size(); j++) {
- Map<String, String> groupObject = filterJsonArray.get(j);
- if (!WebParameterUtils.getStringParamValue(groupObject,
+ itemsMap = filterJsonArray.get(j);
+ if (!WebParameterUtils.getStringParamValue(itemsMap,
WebFieldDef.GROUPNAME, true, "", result)) {
return result.success;
}
String groupName = (String) result.retData1;
- if (!WebParameterUtils.getStringParamValue(groupObject,
+ if (!WebParameterUtils.getStringParamValue(itemsMap,
WebFieldDef.TOPICNAME, true, "", result)) {
return result.success;
}
String topicName = (String) result.retData1;
if (!configuredTopicSet.contains(topicName)) {
- result.setFailResult(sBuilder
+ result.setFailResult(sBuffer
.append(WebFieldDef.TOPICNAME.name)
- .append(" value ").append(topicName)
+ .append(" ").append(topicName)
.append(" is not configure, please configure first!").toString());
- sBuilder.delete(0, sBuilder.length());
+ sBuffer.delete(0, sBuffer.length());
return result.success;
}
// get consumeEnable info
- if (!WebParameterUtils.getBooleanParamValue(groupObject,
+ if (!WebParameterUtils.getBooleanParamValue(itemsMap,
WebFieldDef.CONSUMEENABLE, false, (isAddOp ? true : null), result)) {
return result.isSuccess();
}
Boolean consumeEnable = (Boolean) result.retData1;
// get disableReason list
- if (!WebParameterUtils.getStringParamValue(groupObject,
- WebFieldDef.REASON, false, "", result)) {
+ if (!WebParameterUtils.getStringParamValue(itemsMap,
+ WebFieldDef.REASON, false, (isAddOp ? "" : null), result)) {
return result.isSuccess();
}
String disableRsn = (String) result.retData1;
// get filterEnable info
- if (!WebParameterUtils.getBooleanParamValue(groupObject,
+ if (!WebParameterUtils.getBooleanParamValue(itemsMap,
WebFieldDef.FILTERENABLE, false, (isAddOp ? false : null), result)) {
return result.isSuccess();
}
Boolean filterEnable = (Boolean) result.retData1;
// get filterConds info
- if (!WebParameterUtils.getFilterCondString(groupObject,
- false, isAddOp, result)) {
+ if (!WebParameterUtils.getFilterCondString(
+ itemsMap, false, isAddOp, result)) {
return result.isSuccess();
}
String filterCondStr = (String) result.retData1;
- // record object
- if (isAddOp) {
- // add new record
- GroupConsumeCtrlEntity entity =
- new GroupConsumeCtrlEntity(defOpEntity, groupName, topicName);
- entity.updModifyInfo(defOpEntity.getDataVerId(),
- consumeEnable, disableRsn, filterEnable, filterCondStr);
- result.setSuccResult(entity);
- csmProcessMap.put(entity.getRecordKey(),
- new GroupProcessResult(groupName, topicName, result));
- } else {
- // modify current record
- GroupConsumeCtrlEntity curEntity =
- metaDataManager.getGroupConsumeCtrlConf(groupName, topicName);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_NOT_EXIST.getDescription());
- csmProcessMap.put(KeyBuilderUtils.buildGroupTopicRecKey(groupName, topicName),
- new GroupProcessResult(groupName, topicName, result));
- continue;
- }
- GroupConsumeCtrlEntity newEntity = curEntity.clone();
- newEntity.updBaseModifyInfo(defOpEntity);
- if (newEntity.updModifyInfo(defOpEntity.getDataVerId(),
- consumeEnable, disableRsn, filterEnable, filterCondStr)) {
- result.setSuccResult(newEntity);
- } else {
- result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
- DataOpErrCode.DERR_UNCHANGED.getDescription());
- }
- csmProcessMap.put(newEntity.getRecordKey(),
- new GroupProcessResult(groupName, topicName, result));
- }
+ // add record object
+ itemConf = new GroupConsumeCtrlEntity(defOpEntity, groupName, topicName);
+ itemConf.updModifyInfo(defOpEntity.getDataVerId(),
+ consumeEnable, disableRsn, filterEnable, filterCondStr);
+ addRecordMap.put(itemConf.getRecordKey(), itemConf);
}
// check result
- if (csmProcessMap.isEmpty()) {
- result.setFailResult(sBuilder
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuffer
.append("Not found record in ")
.append(WebFieldDef.GROUPCSMJSONSET.name)
.append(" parameter!").toString());
- sBuilder.delete(0, sBuilder.length());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
- result.setSuccResult(csmProcessMap);
+ result.setSuccResult(addRecordMap);
return result.isSuccess();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index e8d5265..9bf0f6d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -18,6 +18,7 @@
package org.apache.tubemq.server.master.web.handler;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
@@ -47,29 +49,14 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
// register modify method
registerModifyWebMethod("admin_add_group_resctrl_info",
"adminAddGroupResCtrlConf");
- registerModifyWebMethod("admin_upd_group_resctrl_info",
+ registerModifyWebMethod("admin_batch_add_group_resctrl_info",
+ "adminBatchAddGroupResCtrlConf");
+ registerModifyWebMethod("admin_update_group_resctrl_info",
"adminModGroupResCtrlConf");
- registerModifyWebMethod("admin_rmv_group_resctrl_info",
+ registerModifyWebMethod("admin_batch_update_group_resctrl_info",
+ "adminBatchUpdGroupResCtrlConf");
+ registerModifyWebMethod("admin_delete_group_resctrl_info",
"adminDelGroupResCtrlConf");
-
- // register query method
- registerQueryWebMethod("admin_query_def_flow_control_rule",
- "adminBlankProcessFun");
- registerQueryWebMethod("admin_query_group_flow_control_rule",
- "adminBlankProcessFun");
- // register modify method
- registerModifyWebMethod("admin_set_def_flow_control_rule",
- "adminBlankProcessFun");
- registerModifyWebMethod("admin_set_group_flow_control_rule",
- "adminBlankProcessFun");
- registerModifyWebMethod("admin_rmv_def_flow_control_rule",
- "adminBlankProcessFun");
- registerModifyWebMethod("admin_rmv_group_flow_control_rule",
- "adminBlankProcessFun");
- registerModifyWebMethod("admin_upd_def_flow_control_rule",
- "adminBlankProcessFun");
- registerModifyWebMethod("admin_upd_group_flow_control_rule",
- "adminBlankProcessFun");
}
/**
@@ -151,7 +138,47 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
* @param req
* @return
*/
- private StringBuilder adminAddGroupResCtrlConf(HttpServletRequest req) {
+ public StringBuilder adminAddGroupResCtrlConf(HttpServletRequest req) {
+ return innAddOrUpdGroupResCtrlConf(req, true);
+ }
+
+ /**
+ * Add group resource control info in batch
+ *
+ * @param req
+ * @return
+ */
+ private StringBuilder adminBatchAddGroupResCtrlConf(HttpServletRequest req) {
+ return innBatchAddOrUpdGroupResCtrlConf(req, true);
+ }
+
+ /**
+ * update group resource control info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminModGroupResCtrlConf(HttpServletRequest req) {
+ return innAddOrUpdGroupResCtrlConf(req, false);
+ }
+
+ /**
+ * update group resource control info in batch
+ *
+ * @param req
+ * @return
+ */
+ private StringBuilder adminBatchUpdGroupResCtrlConf(HttpServletRequest req) {
+ return innBatchAddOrUpdGroupResCtrlConf(req, false);
+ }
+
+ /**
+ * delete group resource control rule
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDelGroupResCtrlConf(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
StringBuilder sBuilder = new StringBuilder(512);
// valid operation authorize info
@@ -161,7 +188,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -173,77 +200,15 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
Set<String> batchGroupNames = (Set<String>) result.retData1;
- // get consumeEnable info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.CONSUMEENABLE, false, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Boolean consumeEnable = (Boolean) result.retData1;
- // get disableReason list
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.REASON, false, "", result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- String disableRsn = (String) result.retData1;
- // get resCheckStatus info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.RESCHECKENABLE, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Boolean resCheckEnable = (Boolean) result.retData1;
- // get and valid allowedBrokerClientRate info
- if (!WebParameterUtils.getQryPriorityIdParameter(req,
- false, TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN,
- TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int allowedBClientRate = (int) result.retData1;
- // get and valid qryPriorityId info
- if (!WebParameterUtils.getQryPriorityIdParameter(req,
- false, TServerConstants.QRY_PRIORITY_DEF_VALUE,
- TServerConstants.QRY_PRIORITY_MIN_VALUE, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- int qryPriorityId = (int) result.retData1;
- // get flowCtrlEnable info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.FLOWCTRLENABLE, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Boolean flowCtrlEnable = (Boolean) result.retData1;
- // get and flow control rule info
- int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req,
- TServerConstants.BLANK_FLOWCTRL_RULES, result);
- if (!result.success) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- String flowCtrlInfo = (String) result.retData1;
- // add group resource record
- GroupProcessResult retItem;
- List<GroupProcessResult> retInfo = new ArrayList<>();
- for (String groupName : batchGroupNames) {
- retItem = metaDataManager.addGroupResCtrlConf(opEntity, groupName, consumeEnable,
- disableRsn, resCheckEnable, allowedBClientRate, qryPriorityId,
- flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuilder, result);
- retInfo.add(retItem);
- }
+ // delete group resource record
+ List<GroupProcessResult> retInfo =
+ metaDataManager.delGroupResCtrlConf(opEntity.getModifyUser(),
+ batchGroupNames, sBuilder, result);
return buildRetInfo(retInfo, sBuilder);
}
- /**
- * update group resource control info
- *
- * @param req
- * @return
- */
- private StringBuilder adminModGroupResCtrlConf(HttpServletRequest req) {
+ private StringBuilder innAddOrUpdGroupResCtrlConf(HttpServletRequest req,
+ boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuilder = new StringBuilder(512);
// valid operation authorize info
@@ -253,7 +218,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -267,36 +232,41 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
Set<String> batchGroupNames = (Set<String>) result.retData1;
// get consumeEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.CONSUMEENABLE, false, null, result)) {
+ WebFieldDef.CONSUMEENABLE, false, (isAddOp ? true : null), result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
Boolean consumeEnable = (Boolean) result.retData1;
- // get disableReason list
+ // get disableReason info
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.REASON, false, "", result)) {
+ WebFieldDef.REASON, false, (isAddOp ? "" : null), result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
String disableRsn = (String) result.retData1;
// get resCheckStatus info
if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.RESCHECKENABLE, false, null, result)) {
+ WebFieldDef.RESCHECKENABLE, false, (isAddOp ? false : null), result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
Boolean resCheckEnable = (Boolean) result.retData1;
// get and valid allowedBrokerClientRate info
- if (!WebParameterUtils.getQryPriorityIdParameter(req,
- false, TBaseConstants.META_VALUE_UNDEFINED,
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.ALWDBCRATE,
+ false, (isAddOp ? TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN
+ : TBaseConstants.META_VALUE_UNDEFINED),
TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
int allowedBClientRate = (int) result.retData1;
+ // get def cluster setting info
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting(false);
// get and valid qryPriorityId info
if (!WebParameterUtils.getQryPriorityIdParameter(req,
- false, TBaseConstants.META_VALUE_UNDEFINED,
+ false, (isAddOp ? defClusterSetting.getQryPriorityId()
+ : TBaseConstants.META_VALUE_UNDEFINED),
TServerConstants.QRY_PRIORITY_MIN_VALUE, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
@@ -304,38 +274,34 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
int qryPriorityId = (int) result.retData1;
// get flowCtrlEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.FLOWCTRLENABLE, false, null, result)) {
+ WebFieldDef.FLOWCTRLENABLE, false, (isAddOp ? false : null), result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
Boolean flowCtrlEnable = (Boolean) result.retData1;
// get and flow control rule info
- int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req, null, result);
+ int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req,
+ (isAddOp ? TServerConstants.BLANK_FLOWCTRL_RULES : null), result);
if (!result.success) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
String flowCtrlInfo = (String) result.retData1;
- // modify group resource record
+ // add group resource record
GroupProcessResult retItem;
List<GroupProcessResult> retInfo = new ArrayList<>();
for (String groupName : batchGroupNames) {
- retItem = metaDataManager.updGroupResCtrlConf(opEntity, groupName,
+ retItem = metaDataManager.addOrUpdGroupResCtrlConf(isAddOp, opEntity, groupName,
consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
- qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo,
- sBuilder, result);
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuilder, result);
retInfo.add(retItem);
}
return buildRetInfo(retInfo, sBuilder);
}
- /**
- * delete group resource control rule
- *
- * @param req
- * @return
- */
- private StringBuilder adminDelGroupResCtrlConf(HttpServletRequest req) {
+
+ private StringBuilder innBatchAddOrUpdGroupResCtrlConf(HttpServletRequest req,
+ boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuilder = new StringBuilder(512);
// valid operation authorize info
@@ -345,25 +311,127 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // get group list
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, true, null, result)) {
+ BaseEntity defOpEntity = (BaseEntity) result.getRetData();
+ // get group resource control json record
+ if (!getGroupResCtrlJsonSetInfo(req, isAddOp, defOpEntity, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- // delete group resource record
- List<GroupProcessResult> retInfo =
- metaDataManager.delGroupResCtrlConf(opEntity.getModifyUser(),
- batchGroupNames, sBuilder, result);
+ Map<String, GroupResCtrlEntity> addRecordMap =
+ (Map<String, GroupResCtrlEntity>) result.getRetData();
+ // add or update group resource record
+ List<GroupProcessResult> retInfo = new ArrayList<>();
+ for (GroupResCtrlEntity newResCtrlEntity : addRecordMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdGroupResCtrlConf(
+ isAddOp, newResCtrlEntity, sBuilder, result));
+ }
return buildRetInfo(retInfo, sBuilder);
}
+ private boolean getGroupResCtrlJsonSetInfo(HttpServletRequest req, boolean isAddOp,
+ BaseEntity defOpEntity, StringBuilder sBuilder,
+ ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.GROUPRESCTRLSET, true, null, result)) {
+ return result.success;
+ }
+ List<Map<String, String>> ctrlJsonArray =
+ (List<Map<String, String>>) result.retData1;
+ // get default qryPriorityId
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting(false);
+ int defQryPriorityId = defClusterSetting.getQryPriorityId();
+ // check and get topic control configure
+ GroupResCtrlEntity itemEntity;
+ Map<String, String> itemValueMap;
+ HashMap<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
+ for (int j = 0; j < ctrlJsonArray.size(); j++) {
+ itemValueMap = ctrlJsonArray.get(j);
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
+ isAddOp, defOpEntity, result)) {
+ return result.isSuccess();
+ }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
+ // get group configure info
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.GROUPNAME, true, "", result)) {
+ return result.success;
+ }
+ String groupName = (String) result.retData1;
+ // get consumeEnable info
+ if (!WebParameterUtils.getBooleanParamValue(itemValueMap,
+ WebFieldDef.CONSUMEENABLE, false, (isAddOp ? true : null), result)) {
+ return result.isSuccess();
+ }
+ Boolean consumeEnable = (Boolean) result.retData1;
+ // get disableReason info
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.REASON, false, (isAddOp ? "" : null), result)) {
+ return result.isSuccess();
+ }
+ String disableRsn = (String) result.retData1;
+ // get resCheckStatus info
+ if (!WebParameterUtils.getBooleanParamValue(itemValueMap,
+ WebFieldDef.RESCHECKENABLE, false, (isAddOp ? false : null), result)) {
+ return result.isSuccess();
+ }
+ Boolean resCheckEnable = (Boolean) result.retData1;
+ // get and valid allowedBrokerClientRate info
+ if (!WebParameterUtils.getIntParamValue(itemValueMap, WebFieldDef.ALWDBCRATE,
+ false, (isAddOp ? TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN
+ : TBaseConstants.META_VALUE_UNDEFINED),
+ TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN, result)) {
+ return result.isSuccess();
+ }
+ int allowedBClientRate = (int) result.retData1;
+ // get def cluster setting info
+ // get and valid qryPriorityId info
+ if (!WebParameterUtils.getQryPriorityIdParameter(itemValueMap,
+ false, (isAddOp ? defClusterSetting.getQryPriorityId()
+ : TBaseConstants.META_VALUE_UNDEFINED),
+ TServerConstants.QRY_PRIORITY_MIN_VALUE, result)) {
+ return result.isSuccess();
+ }
+ int qryPriorityId = (int) result.retData1;
+ // get flowCtrlEnable info
+ if (!WebParameterUtils.getBooleanParamValue(itemValueMap,
+ WebFieldDef.FLOWCTRLENABLE, false,
+ (isAddOp ? false : null), result)) {
+ return result.isSuccess();
+ }
+ Boolean flowCtrlEnable = (Boolean) result.retData1;
+ // get and flow control rule info
+ int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(itemValueMap,
+ (isAddOp ? TServerConstants.BLANK_FLOWCTRL_RULES : null), result);
+ if (!result.success) {
+ return result.isSuccess();
+ }
+ String flowCtrlInfo = (String) result.retData1;
+ itemEntity =
+ new GroupResCtrlEntity(itemOpEntity, groupName);
+ itemEntity.updModifyInfo(itemEntity.getDataVerId(),
+ consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
+ addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+ }
+ // check result
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuilder
+ .append("Not found record info in ")
+ .append(WebFieldDef.GROUPRESCTRLSET.name)
+ .append(" parameter!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(addRecordMap);
+ return result.isSuccess();
+ }
+
private StringBuilder buildRetInfo(List<GroupProcessResult> retInfo,
StringBuilder sBuilder) {
int totalCnt = 0;
@@ -381,18 +449,4 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
- /**
- * blank process function
- *
- * @param req
- * @return
- */
- public StringBuilder adminBlankProcessFun(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
- WebParameterUtils.buildFailResult(sBuilder,
- "Expired method, please check the latest api documentation!");
- return sBuilder;
- }
-
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index 2daa6ee..55152cd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -59,9 +59,9 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
"adminAddTopicCtrlInfo");
registerModifyWebMethod("admin_batch_add_topic_control_info",
"adminBatchAddTopicCtrlInfo");
- registerModifyWebMethod("admin_modify_topic_control_info",
+ registerModifyWebMethod("admin_update_topic_control_info",
"adminModTopicCtrlInfo");
- registerModifyWebMethod("admin_batch_modify_topic_control_info",
+ registerModifyWebMethod("admin_batch_update_topic_control_info",
"adminBatchModTopicCtrlInfo");
registerModifyWebMethod("admin_delete_topic_control_info",
"adminDeleteTopicCtrlInfo");
@@ -115,62 +115,8 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder adminAddTopicCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // check and get topicName field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Set<String> topicNameSet = (Set<String>) result.retData1;
- // get topicNameId info
- int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
- if (topicNameSet.size() == 1) {
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICNAMEID,
- false, TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- topicNameId = (int) result.getRetData();
- }
- // get authCtrlStatus info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.AUTHCTRLENABLE, false, false, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- Boolean enableTopicAuth = (Boolean) result.retData1;
- // check and get max message size
- ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting(false);
- int maxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
- // check max message size
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.MAXMSGSIZEINMB, false, maxMsgSizeMB,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
- }
- maxMsgSizeMB = (int) result.getRetData();
- // add records
- List<TopicProcessResult> retInfo =
- metaDataManager.addOrUpdTopicCtrlConf(opEntity, topicNameSet,
- topicNameId, enableTopicAuth, maxMsgSizeMB, sBuilder, result);
- return buildRetInfo(retInfo, sBuilder);
+ return innAddOrUpdTopicCtrlInfo(req, true);
+
}
/**
@@ -180,43 +126,68 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder adminBatchAddTopicCtrlInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdTopicCtrlInfo(req, true);
+ }
+
+ /**
+ * Modify topic control info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminModTopicCtrlInfo(HttpServletRequest req) {
+ return innAddOrUpdTopicCtrlInfo(req, false);
+ }
+
+ /**
+ * Modify new topic control record in batch
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminBatchModTopicCtrlInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdTopicCtrlInfo(req, false);
+ }
+
+ /**
+ * Delete topic control info
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDeleteTopicCtrlInfo(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
if (!WebParameterUtils.validReqAuthorizeInfo(req,
WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, true, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- BaseEntity defOpEntity = (BaseEntity) result.getRetData();
- // check and get add record map
- if (!getTopicCtrlJsonSetInfo(req, true,
- defOpEntity, null, sBuilder, result)) {
- WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
- return sBuilder;
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- Map<String, TopicCtrlEntity> addRecordMap =
- (Map<String, TopicCtrlEntity>) result.getRetData();
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ // delete records
List<TopicProcessResult> retInfo = new ArrayList<>();
- for (TopicCtrlEntity topicCtrlInfo : addRecordMap.values()) {
- retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(topicCtrlInfo, sBuilder, result));
+ for (String topicName : topicNameSet) {
+ metaDataManager.delTopicCtrlConf(opEntity.getModifyUser(), topicName, sBuffer, result);
+ retInfo.add(new TopicProcessResult(
+ TBaseConstants.META_VALUE_UNDEFINED, topicName, result));
}
- return buildRetInfo(retInfo, sBuilder);
+ return buildRetInfo(retInfo, sBuffer);
}
- /**
- * Modify topic control info
- *
- * @param req
- * @return
- */
- // #lizard forgives
- public StringBuilder adminModTopicCtrlInfo(HttpServletRequest req) {
+ private StringBuilder innAddOrUpdTopicCtrlInfo(HttpServletRequest req, boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuilder = new StringBuilder(512);
// valid operation authorize info
@@ -226,12 +197,12 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- BaseEntity opInfoEntity = (BaseEntity) result.getRetData();
- // check and get topicName info
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, true, null, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
@@ -250,7 +221,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
}
// get authCtrlStatus info
if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.AUTHCTRLENABLE, false, false, result)) {
+ WebFieldDef.AUTHCTRLENABLE, false, (isAddOp ? false : null), result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
@@ -261,27 +232,25 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
int maxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
// check max message size
if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.MAXMSGSIZEINMB, false, maxMsgSizeMB,
+ WebFieldDef.MAXMSGSIZEINMB, false,
+ (isAddOp ? maxMsgSizeMB : TBaseConstants.META_VALUE_UNDEFINED),
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
maxMsgSizeMB = (int) result.getRetData();
- // modify records
- List<TopicProcessResult> retInfo =
- metaDataManager.addOrUpdTopicCtrlConf(opInfoEntity, topicNameSet,
- topicNameId, enableTopicAuth, maxMsgSizeMB, sBuilder, result);
+ // add or update records
+ TopicProcessResult retItem;
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (String topicName : topicNameSet) {
+ retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(isAddOp, opEntity,
+ topicName, topicNameId, enableTopicAuth, maxMsgSizeMB, sBuilder, result));
+ }
return buildRetInfo(retInfo, sBuilder);
}
- /**
- * Modify new topic control record in batch
- *
- * @param req
- * @return
- */
- public StringBuilder adminBatchModTopicCtrlInfo(HttpServletRequest req) {
+ private StringBuilder innBatchAddOrUpdTopicCtrlInfo(HttpServletRequest req, boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuilder = new StringBuilder(512);
// valid operation authorize info
@@ -291,71 +260,31 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
return sBuilder;
}
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
BaseEntity defOpEntity = (BaseEntity) result.getRetData();
- // check and get modify record map
- if (!getTopicCtrlJsonSetInfo(req, false,
- defOpEntity, null, sBuilder, result)) {
+ // check and get add record map
+ if (!getTopicCtrlJsonSetInfo(req, isAddOp, defOpEntity, sBuilder, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Map<String, TopicCtrlEntity> modRecordMap =
+ Map<String, TopicCtrlEntity> addRecordMap =
(Map<String, TopicCtrlEntity>) result.getRetData();
List<TopicProcessResult> retInfo = new ArrayList<>();
- for (TopicCtrlEntity topicCtrlInfo : modRecordMap.values()) {
- retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(topicCtrlInfo, sBuilder, result));
+ for (TopicCtrlEntity topicCtrlInfo : addRecordMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(
+ isAddOp, topicCtrlInfo, sBuilder, result));
}
return buildRetInfo(retInfo, sBuilder);
}
- /**
- * Delete topic control info
- *
- * @param req
- * @return
- */
- public StringBuilder adminDeleteTopicCtrlInfo(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // check and get topicName info
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, true, null, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Set<String> topicNameSet = (Set<String>) result.retData1;
- // delete records
- List<TopicProcessResult> retInfo = new ArrayList<>();
- for (String topicName : topicNameSet) {
- metaDataManager.delTopicCtrlConf(opEntity.getModifyUser(), topicName, sBuffer, result);
- retInfo.add(new TopicProcessResult(
- TBaseConstants.META_VALUE_UNDEFINED, topicName, result));
- }
- return buildRetInfo(retInfo, sBuffer);
- }
-
private boolean getTopicCtrlJsonSetInfo(HttpServletRequest req, boolean isAddOp,
- BaseEntity defOpEntity,
- List<Map<String, String>> defValue,
- StringBuilder sBuilder,
+ BaseEntity defOpEntity, StringBuilder sBuilder,
ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
- WebFieldDef.TOPICCTRLSET, true, defValue, result)) {
+ WebFieldDef.TOPICCTRLSET, true, null, result)) {
return result.success;
}
List<Map<String, String>> ctrlJsonArray =
@@ -366,37 +295,41 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
int defMaxMsgSizeMB = defClusterSetting.getMaxMsgSizeInMB();
// check and get topic control configure
TopicCtrlEntity itemConf;
+ Map<String, String> itemConfMap;
HashMap<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
for (int j = 0; j < ctrlJsonArray.size(); j++) {
- Map<String, String> confMap = ctrlJsonArray.get(j);
+ itemConfMap = ctrlJsonArray.get(j);
// check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(confMap, isAddOp, defOpEntity, result)) {
+ if (!WebParameterUtils.getAUDBaseInfo(itemConfMap,
+ isAddOp, defOpEntity, result)) {
return result.isSuccess();
}
BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
// get topicName configure info
- if (!WebParameterUtils.getStringParamValue(confMap,
+ if (!WebParameterUtils.getStringParamValue(itemConfMap,
WebFieldDef.TOPICNAME, true, "", result)) {
return result.success;
}
String topicName = (String) result.retData1;
// check max message size
- if (!WebParameterUtils.getIntParamValue(confMap,
- WebFieldDef.MAXMSGSIZEINMB, false, defMaxMsgSizeMB,
+ if (!WebParameterUtils.getIntParamValue(itemConfMap,
+ WebFieldDef.MAXMSGSIZEINMB, false,
+ (isAddOp ? defMaxMsgSizeMB : TBaseConstants.META_VALUE_UNDEFINED),
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) {
return result.isSuccess();
}
int itemMaxMsgSizeMB = (int) result.getRetData();
// get topicNameId field
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.TOPICNAMEID,
+ if (!WebParameterUtils.getIntParamValue(itemConfMap, WebFieldDef.TOPICNAMEID,
false, TBaseConstants.META_VALUE_UNDEFINED, 0, result)) {
return result.isSuccess();
}
int itemTopicNameId = (int) result.getRetData();
// get authCtrlStatus info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.AUTHCTRLENABLE, false, false, result)) {
+ if (!WebParameterUtils.getBooleanParamValue(itemConfMap,
+ WebFieldDef.AUTHCTRLENABLE, false,
+ (isAddOp ? false : null), result)) {
return result.isSuccess();
}
Boolean enableTopicAuth = (Boolean) result.retData1;