You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/04/29 06:47:37 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-574]Adjust
WebAdminGroupCtrlHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 2980d52 [INLONG-574]Adjust WebAdminGroupCtrlHandler class implementation
2980d52 is described below
commit 2980d52193d5fc681baa724f7735b144eabe452c
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Apr 27 20:04:46 2021 +0800
[INLONG-574]Adjust WebAdminGroupCtrlHandler class implementation
---
.../tubemq/server/common/fielddef/WebFieldDef.java | 12 +-
.../server/common/paramcheck/PBParameterUtils.java | 24 +-
.../server/common/utils/WebParameterUtils.java | 190 ++-
.../bdbentitys/BdbGroupFlowCtrlEntity.java | 35 -
.../server/master/metamanage/MetaDataManager.java | 302 +++-
.../metastore/BdbMetaStoreServiceImpl.java | 17 +-
.../metamanage/metastore/MetaStoreService.java | 6 +-
.../metastore/dao/entity/BaseEntity.java | 32 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 28 +
.../dao/entity/GroupConsumeCtrlEntity.java | 2 +-
.../metastore/dao/entity/GroupResCtrlEntity.java | 114 +-
.../dao/mapper/GroupConsumeCtrlMapper.java | 2 +
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 18 +
.../web/handler/WebAdminFlowRuleHandler.java | 491 ++----
.../web/handler/WebAdminGroupCtrlHandler.java | 1653 ++++++++------------
.../web/handler/WebAdminTopicAuthHandler.java | 560 +++----
.../master/web/handler/WebBrokerConfHandler.java | 24 +-
.../web/handler/WebGroupConsumeCtrlHandler.java | 40 +-
.../master/web/handler/WebGroupResCtrlHandler.java | 85 +-
.../master/web/handler/WebMasterInfoHandler.java | 338 ++--
.../master/web/handler/WebOtherInfoHandler.java | 8 +-
.../master/web/handler/WebTopicCtrlHandler.java | 15 +-
.../master/web/handler/WebTopicDeployHandler.java | 26 +-
23 files changed, 1824 insertions(+), 2198 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index c20a272..c924b62 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -233,7 +233,17 @@ public enum WebFieldDef {
"Allowed broker client rate, same as alwdBrokerClientRate", RegexDef.TMP_NUMBER),
@Deprecated
GROUPJSONSET(83, "groupNameJsonSet", "gJsonSet", WebFieldType.JSONSET,
- "The black list group set that needs to be added or modified");
+ "The black list group set that needs to be added or modified"),
+ REJOINWAIT(84, "reJoinWait", "rjWait", WebFieldType.INT,
+ "The duration for consumer rejoin rebalance", RegexDef.TMP_NUMBER),
+
+ COMPSCONSUMERID(85, "consumerId", "csmId", WebFieldType.COMPSTRING,
+ "consumer id", TServerConstants.CFG_CONSUMER_CLIENTID_MAX_LENGTH,
+ RegexDef.TMP_CONSUMERID),
+ ISENABLE(86, "isEnable", "isEnable",
+ WebFieldType.BOOLEAN, "With status if enable.");
+
+
public final int id;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index c27ad5b..b30f091 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -520,7 +520,7 @@ public class PBParameterUtils {
result.setFailResult(strBuffer.append("Request miss necessary ")
.append(fieldDef.name).append(" data!").toString());
strBuffer.delete(0, strBuffer.length());
- return result.success;
+ return result.isSuccess();
}
String tmpValue = paramValue.trim();
if (tmpValue.length() > fieldDef.valMaxLen) {
@@ -528,10 +528,10 @@ public class PBParameterUtils {
.append("'s length over max value, allowed max length is ")
.append(fieldDef.valMaxLen).toString());
strBuffer.delete(0, strBuffer.length());
- return result.success;
+ return result.isSuccess();
}
result.setSuccResult(tmpValue);
- return result.success;
+ return result.isSuccess();
}
/**
@@ -549,9 +549,9 @@ public class PBParameterUtils {
ProcessResult result) {
if (!getStringParameter(WebFieldDef.TOPICNAME,
topicName, strBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- String tmpValue = (String) result.retData1;
+ String tmpValue = (String) result.getRetData();
if (metadataManager.getTopicMetadata(tmpValue) == null) {
result.setFailResult(TErrCodeConstants.FORBIDDEN,
strBuffer.append(WebFieldDef.TOPICNAME.name)
@@ -559,7 +559,7 @@ public class PBParameterUtils {
.append(" not existed, please check your configure").toString());
strBuffer.delete(0, strBuffer.length());
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -578,9 +578,9 @@ public class PBParameterUtils {
ProcessResult result) {
if (!getStringParameter(WebFieldDef.TOPICNAME,
topicName, strBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- String tmpValue = (String) result.retData1;
+ String tmpValue = (String) result.getRetData();
TopicMetadata topicMetadata = metadataManager.getTopicMetadata(tmpValue);
if (topicMetadata == null) {
result.setFailResult(TErrCodeConstants.FORBIDDEN,
@@ -588,7 +588,7 @@ public class PBParameterUtils {
.append(" ").append(tmpValue)
.append(" not existed, please check your configure").toString());
strBuffer.delete(0, strBuffer.length());
- return result.success;
+ return result.isSuccess();
}
if (metadataManager.isClosedTopic(tmpValue)) {
result.setFailResult(TErrCodeConstants.FORBIDDEN,
@@ -596,7 +596,7 @@ public class PBParameterUtils {
.append(" ").append(tmpValue)
.append(" has been closed").toString());
strBuffer.delete(0, strBuffer.length());
- return result.success;
+ return result.isSuccess();
}
int realPartition = partitionId < TBaseConstants.META_STORE_INS_BASE
? partitionId : partitionId % TBaseConstants.META_STORE_INS_BASE;
@@ -606,9 +606,9 @@ public class PBParameterUtils {
.append(" ").append(tmpValue).append("-").append(partitionId)
.append(" not existed, please check your configure").toString());
strBuffer.delete(0, strBuffer.length());
- return result.success;
+ return result.isSuccess();
}
result.setSuccResult(topicMetadata);
- return result.success;
+ return result.isSuccess();
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 0154bf1..9219a00 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -59,65 +59,65 @@ public class WebParameterUtils {
private static final List<Integer> allowedPriorityVal = Arrays.asList(1, 2, 3);
- public static StringBuilder buildFailResult(StringBuilder strBuffer, String errMsg) {
- return strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ public static StringBuilder buildFailResult(StringBuilder sBuffer, String errMsg) {
+ return sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append(errMsg).append("\"}");
}
public static StringBuilder buildFailResultWithBlankData(String errMsg,
- StringBuilder strBuffer) {
- return buildFailResultWithBlankData(400, errMsg, strBuffer);
+ StringBuilder sBuffer) {
+ return buildFailResultWithBlankData(400, errMsg, sBuffer);
}
public static StringBuilder buildFailResultWithBlankData(int errcode, String errMsg,
- StringBuilder strBuffer) {
- return strBuffer.append("{\"result\":false,\"errCode\":").append(errcode)
+ StringBuilder sBuffer) {
+ return sBuffer.append("{\"result\":false,\"errCode\":").append(errcode)
.append(",\"errMsg\":\"").append(errMsg).append("\",\"data\":[]}");
}
- public static StringBuilder buildSuccessResult(StringBuilder strBuffer) {
- return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ public static StringBuilder buildSuccessResult(StringBuilder sBuffer) {
+ return sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[]}");
}
- public static StringBuilder buildSuccessResult(StringBuilder strBuffer, String appendInfo) {
- return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"").
- append(appendInfo).append("\"}");
+ public static StringBuilder buildSuccessResult(StringBuilder sBuffer, String appendInfo) {
+ return sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"").
+ append(appendInfo).append("\",\"data\":[]}");
}
- public static StringBuilder buildSuccessWithDataRetBegin(StringBuilder strBuffer) {
- return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+ public static StringBuilder buildSuccessWithDataRetBegin(StringBuilder sBuffer) {
+ return sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
}
public static StringBuilder buildSuccessWithDataRetEnd(
- StringBuilder strBuffer, int totalCnt) {
- return strBuffer.append("],\"count\":").append(totalCnt).append("}");
+ StringBuilder sBuffer, int totalCnt) {
+ return sBuffer.append("],\"count\":").append(totalCnt).append("}");
}
public static StringBuilder buildSuccWithData(long dataVerId,
- StringBuilder strBuffer) {
+ StringBuilder sBuffer) {
List<Long> dataVerIds = new ArrayList<>(1);
dataVerIds.add(dataVerId);
- return buildSuccWithData("Ok", dataVerIds, strBuffer);
+ return buildSuccWithData("Ok", dataVerIds, sBuffer);
}
public static StringBuilder buildSuccWithData(String errMsg,
List<Long> dataVerIds,
- StringBuilder strBuffer) {
+ StringBuilder sBuffer) {
int count = 0;
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"")
+ sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"")
.append(errMsg).append("\",\"data\":[");
for (Long dataVerId : dataVerIds) {
if (dataVerId == null) {
continue;
}
if (count++ > 0) {
- strBuffer.append(",");
+ sBuffer.append(",");
}
- strBuffer.append("{\"").append(WebFieldDef.DATAVERSIONID.name)
+ sBuffer.append("{\"").append(WebFieldDef.DATAVERSIONID.name)
.append("\":").append(dataVerId).append("}");
}
- strBuffer.append("],\"count\":").append(count).append("}");
- return strBuffer;
+ sBuffer.append("],\"count\":").append(count).append("}");
+ return sBuffer;
}
public static <T> boolean getAUDBaseInfo(T paramCntr, boolean isAdd,
@@ -129,7 +129,7 @@ public class WebParameterUtils {
false, TBaseConstants.META_VALUE_UNDEFINED, sBuffer, result)) {
return result.isSuccess();
}
- long dataVerId = (long) result.retData1;
+ long dataVerId = (long) result.getRetData();
// check and get createUser or modifyUser
String createUsr = "";
Date createDate = null;
@@ -141,14 +141,14 @@ public class WebParameterUtils {
sBuffer, result)) {
return result.isSuccess();
}
- createUsr = (String) result.retData1;
+ createUsr = (String) result.getRetData();
// check and get create date
if (!WebParameterUtils.getDateParameter(paramCntr, WebFieldDef.CREATEDATE, false,
(defOpEntity == null ? new Date() : defOpEntity.getCreateDate()),
sBuffer, result)) {
return result.isSuccess();
}
- createDate = (Date) result.retData1;
+ createDate = (Date) result.getRetData();
}
// check modify user field
if (!WebParameterUtils.getStringParamValue(paramCntr, WebFieldDef.MODIFYUSER,
@@ -157,14 +157,14 @@ public class WebParameterUtils {
sBuffer, result)) {
return result.isSuccess();
}
- String modifyUser = (String) result.retData1;
+ String modifyUser = (String) result.getRetData();
// check and get modify date
if (!WebParameterUtils.getDateParameter(paramCntr, WebFieldDef.MODIFYDATE, false,
(defOpEntity == null ? createDate : defOpEntity.getModifyDate()),
sBuffer, result)) {
return result.isSuccess();
}
- Date modifyDate = (Date) result.retData1;
+ Date modifyDate = (Date) result.getRetData();
result.setSuccResult(new BaseEntity(dataVerId,
createUsr, createDate, modifyUser, modifyDate));
return result.isSuccess();
@@ -185,22 +185,21 @@ public class WebParameterUtils {
false, TBaseConstants.META_VALUE_UNDEFINED, sBuffer, result)) {
return result.isSuccess();
}
- long dataVerId = (long) result.retData1;
+ long dataVerId = (long) result.getRetData();
// check createUser user field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.CREATEUSER, false, null, sBuffer, result)) {
return result.isSuccess();
}
- String createUser = (String) result.retData1;
+ String createUser = (String) result.getRetData();
// check modify user field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.MODIFYUSER, false, null, sBuffer, result)) {
return result.isSuccess();
}
- String modifyUser = (String) result.retData1;
+ String modifyUser = (String) result.getRetData();
// set query keys
- qryEntity.updBaseModifyInfo(dataVerId,
- createUser, null, modifyUser, null, null);
+ qryEntity.updQueryKeyInfo(dataVerId, createUser, modifyUser);
result.setSuccResult(qryEntity);
return result.isSuccess();
}
@@ -211,9 +210,9 @@ public class WebParameterUtils {
ProcessResult result) {
if (!getIntParamValue(paramCntr, WebFieldDef.QRYPRIORITYID,
required, defValue, minValue, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- int qryPriorityId = (int) result.retData1;
+ int qryPriorityId = (int) result.getRetData();
if (qryPriorityId > 303 || qryPriorityId < 101) {
result.setFailResult(sBuffer.append("Illegal value in ")
.append(WebFieldDef.QRYPRIORITYID.name)
@@ -260,7 +259,7 @@ public class WebParameterUtils {
WebFieldDef.DELETEPOLICY, required, defValue, sBuffer, result)) {
return result.isSuccess();
}
- String delPolicy = (String) result.retData1;
+ String delPolicy = (String) result.getRetData();
if (TStringUtils.isBlank(delPolicy)) {
return result.isSuccess();
}
@@ -660,12 +659,12 @@ public class WebParameterUtils {
StringBuilder sBuffer, ProcessResult result) {
if (!getStringParamValue(paramCntr, fieldDef,
required, null, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- String paramValue = (String) result.retData1;
+ String paramValue = (String) result.getRetData();
if (paramValue == null) {
result.setSuccResult(defValue);
- return result.success;
+ return result.isSuccess();
}
try {
long paramIntVal = Long.parseLong(paramValue);
@@ -675,7 +674,36 @@ public class WebParameterUtils {
.append(" parse error: ").append(e.getMessage()).toString());
sBuffer.delete(0, sBuffer.length());
}
- return result.success;
+ return result.isSuccess();
+ }
+
+ /**
+ * Parse the parameter value from an object value to a Boolean value
+ *
+ * @param paramCntr parameter container object
+ * @param required a boolean value represent whether the parameter is must required
+ * @param defValue default value
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static <T> boolean getFlowCtrlStatusParamValue(T paramCntr, boolean required,
+ Boolean defValue, StringBuilder sBuffer,
+ ProcessResult result) {
+ // check and get statusId field
+ if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.STATUSID, required,
+ TBaseConstants.META_VALUE_UNDEFINED, 0, 1, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ int paramValue = (int) result.getRetData();
+ if (paramValue == TBaseConstants.META_VALUE_UNDEFINED) {
+ return defValue;
+ } else {
+ if (paramValue == 1) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
}
/**
@@ -746,38 +774,38 @@ public class WebParameterUtils {
StringBuilder sBuffer, ProcessResult result) {
if (!getStringParamValue(paramCntr, fieldDef,
required, null, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
if (fieldDef.isCompFieldType()) {
Set<Integer> tgtValueSet = new HashSet<>();
- Set<String> valItemSet = (Set<String>) result.retData1;
+ Set<String> valItemSet = (Set<String>) result.getRetData();
if (valItemSet.isEmpty()) {
if (hasDefVal) {
tgtValueSet.add(defValue);
}
result.setSuccResult(tgtValueSet);
- return result.success;
+ return result.isSuccess();
}
for (String itemVal : valItemSet) {
if (!checkIntValueNorms(fieldDef, itemVal,
hasMinVal, minValue, hasMaxVal, maxValue, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
tgtValueSet.add((Integer) result.retData1);
}
result.setSuccResult(tgtValueSet);
} else {
- String paramValue = (String) result.retData1;
+ String paramValue = (String) result.getRetData();
if (paramValue == null) {
if (hasDefVal) {
result.setSuccResult(defValue);
}
- return result.success;
+ return result.isSuccess();
}
checkIntValueNorms(fieldDef, paramValue,
hasMinVal, minValue, hasMaxVal, maxValue, sBuffer, result);
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -796,12 +824,12 @@ public class WebParameterUtils {
ProcessResult result) {
if (!getStringParamValue(paramCntr, fieldDef,
required, null, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- String paramValue = (String) result.retData1;
+ String paramValue = (String) result.getRetData();
if (paramValue == null) {
result.setSuccResult(defValue);
- return result.success;
+ return result.isSuccess();
}
if (paramValue.equalsIgnoreCase("true")
|| paramValue.equalsIgnoreCase("false")) {
@@ -813,7 +841,7 @@ public class WebParameterUtils {
result.setSuccResult(defValue);
}
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -878,7 +906,7 @@ public class WebParameterUtils {
} else {
procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
}
- return result.success;
+ return result.isSuccess();
}
// check if value is norm;
if (fieldDef.isCompFieldType()) {
@@ -890,7 +918,7 @@ public class WebParameterUtils {
continue;
}
if (!checkStrValueNorms(fieldDef, strParamValueItem, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
valItemSet.add((String) result.retData1);
}
@@ -903,7 +931,7 @@ public class WebParameterUtils {
} else {
procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
}
- return result.success;
+ return result.isSuccess();
}
// check max item count
if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) {
@@ -918,11 +946,11 @@ public class WebParameterUtils {
result.setSuccResult(valItemSet);
} else {
if (!checkStrValueNorms(fieldDef, paramValue, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
result.setSuccResult(paramValue);
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -943,9 +971,9 @@ public class WebParameterUtils {
ProcessResult result) {
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, required, defValue, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
Set<String> existedTopicSet =
confManager.getTotalConfiguredTopicNames();
for (String topic : topicNameSet) {
@@ -957,7 +985,7 @@ public class WebParameterUtils {
break;
}
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -974,9 +1002,9 @@ public class WebParameterUtils {
StringBuilder sBuffer,
ProcessResult result) {
if (!getFilterCondSet(paramCntr, required, false, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- Set<String> filterCondSet = (Set<String>) result.retData1;
+ Set<String> filterCondSet = (Set<String>) result.getRetData();
if (filterCondSet.isEmpty()) {
if (transBlank) {
sBuffer.append(TServerConstants.BLANK_FILTER_ITEM_STR);
@@ -989,7 +1017,7 @@ public class WebParameterUtils {
}
result.setSuccResult(sBuffer.toString());
sBuffer.delete(0, sBuffer.length());
- return result.success;
+ return result.isSuccess();
}
/**
@@ -1007,12 +1035,12 @@ public class WebParameterUtils {
ProcessResult result) {
if (!WebParameterUtils.getStringParamValue(paramCntr,
WebFieldDef.FILTERCONDS, required, null, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
if (transCondItem) {
// translate filter condition item with "''"
TreeSet<String> newFilterCondSet = new TreeSet<>();
- Set<String> filterCondSet = (Set<String>) result.retData1;
+ Set<String> filterCondSet = (Set<String>) result.getRetData();
if (!filterCondSet.isEmpty()) {
for (String filterCond : filterCondSet) {
newFilterCondSet.add(sBuffer.append(TokenConstants.ARRAY_SEP)
@@ -1023,7 +1051,7 @@ public class WebParameterUtils {
}
result.setSuccResult(newFilterCondSet);
}
- return result.success;
+ return result.isSuccess();
}
@@ -1080,7 +1108,7 @@ public class WebParameterUtils {
} else {
result.setSuccResult(defValue);
}
- return result.success;
+ return result.isSuccess();
}
try {
paramValue = URLDecoder.decode(paramValue,
@@ -1098,7 +1126,7 @@ public class WebParameterUtils {
} else {
result.setSuccResult(defValue);
}
- return result.success;
+ return result.isSuccess();
}
if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
if (paramValue.length() > fieldDef.valMaxLen) {
@@ -1106,7 +1134,7 @@ public class WebParameterUtils {
.append("Parameter ").append(fieldDef.name)
.append("'s length over max allowed length (")
.append(fieldDef.valMaxLen).append(")!").toString());
- return result.success;
+ return result.isSuccess();
}
}
// parse data
@@ -1120,7 +1148,7 @@ public class WebParameterUtils {
.append(" value parse failure, error is ")
.append(e.getMessage()).append("!").toString());
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -1156,7 +1184,7 @@ public class WebParameterUtils {
} else {
result.setSuccResult(defValue);
}
- return result.success;
+ return result.isSuccess();
}
try {
paramValue = URLDecoder.decode(paramValue,
@@ -1174,7 +1202,7 @@ public class WebParameterUtils {
} else {
result.setSuccResult(defValue);
}
- return result.success;
+ return result.isSuccess();
}
if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
if (paramValue.length() > fieldDef.valMaxLen) {
@@ -1182,7 +1210,7 @@ public class WebParameterUtils {
.append("Parameter ").append(fieldDef.name)
.append("'s length over max allowed length (")
.append(fieldDef.valMaxLen).append(")!").toString());
- return result.success;
+ return result.isSuccess();
}
}
// parse data
@@ -1196,7 +1224,7 @@ public class WebParameterUtils {
.append(" value parse failure, error is ")
.append(e.getMessage()).append("!").toString());
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -1215,12 +1243,12 @@ public class WebParameterUtils {
ProcessResult result) {
if (!getStringParamValue(paramCntr, fieldDef,
required, null, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- String paramValue = (String) result.retData1;
+ String paramValue = (String) result.getRetData();
if (paramValue == null) {
result.setSuccResult(defValue);
- return result.success;
+ return result.isSuccess();
}
try {
DateFormat sdf = new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
@@ -1231,7 +1259,7 @@ public class WebParameterUtils {
.append(" parse error: ").append(e.getMessage()).toString());
sBuffer.delete(0, sBuffer.length());
}
- return result.success;
+ return result.isSuccess();
}
@@ -1248,15 +1276,15 @@ public class WebParameterUtils {
boolean required, TMaster master,
StringBuilder sBuffer, ProcessResult result) {
if (!getStringParamValue(req, fieldDef, required, null, sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
}
- String paramValue = (String) result.retData1;
+ String paramValue = (String) result.getRetData();
if (paramValue != null) {
if (!paramValue.equals(master.getMasterConfig().getConfModAuthToken())) {
result.setFailResult("Illegal access, unauthorized request!");
}
}
- return result.success;
+ return result.isSuccess();
}
/**
@@ -1280,7 +1308,7 @@ public class WebParameterUtils {
} else {
result.setSuccResult(defValue);
}
- return result.success;
+ return result.isSuccess();
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
index 25470e1..144b7e1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
@@ -23,7 +23,6 @@ import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
@@ -237,40 +236,6 @@ public class BdbGroupFlowCtrlEntity implements Serializable {
String.valueOf(resCheckStatus.getCode()));
}
- public EnableStatus getConsumeEnable() {
- String atrVal =
- TStringUtils.getAttrValFrmAttributes(this.attributes,
- TStoreConstants.TOKEN_ENABLE_CONSUME);
- if (atrVal != null) {
- return EnableStatus.valueOf(Integer.parseInt(atrVal));
- }
- return EnableStatus.STATUS_ENABLE;
- }
-
- public void setConsumeEnable(EnableStatus enableConsume) {
- this.attributes =
- TStringUtils.setAttrValToAttributes(this.attributes,
- TStoreConstants.TOKEN_ENABLE_CONSUME,
- String.valueOf(enableConsume.getCode()));
- }
-
-
- public String getDisableConsumeReason() {
- if (TStringUtils.isNotBlank(attributes)
- && attributes.contains(TokenConstants.EQ)) {
- return TStringUtils.getAttrValFrmAttributes(
- this.attributes, TStoreConstants.TOKEN_BLK_REASON);
- } else {
- return "";
- }
- }
-
- public void setDisableConsumeReason(String disableConsumeReason) {
- this.attributes =
- TStringUtils.setAttrValToAttributes(this.attributes,
- TStoreConstants.TOKEN_BLK_REASON, disableConsumeReason);
- }
-
public int getAllowedBrokerClientRate() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index d34af08..80cfb32 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -275,19 +275,6 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
}
- // check if consumer group is in the blacklist
- GroupResCtrlEntity resCtrlEntity =
- metaStoreService.getGroupResCtrlConf(groupName);
- if (resCtrlEntity != null && !resCtrlEntity.isEnableConsume()) {
- if (!resCtrlEntity.isEnableConsume()) {
- result.setFailResult(TErrCodeConstants.CONSUME_GROUP_FORBIDDEN,
- sBuffer.append("[unAuthorized Group] ").append(consumerId)
- .append("'s consumerGroup in blackList by administrator, reason is ")
- .append(resCtrlEntity.getDisableReason()).toString());
- sBuffer.delete(0, sBuffer.length());
- return result.isSuccess();
- }
- }
// check if group enable consume
Set<String> disableCsmTopicSet = new HashSet<>();
Set<String> enableFltCsmTopicSet = new HashSet<>();
@@ -300,13 +287,13 @@ public class MetaDataManager implements Server {
continue;
}
if (topicEntity.isAuthCtrlEnable()) {
- //check if consume group is allowed to consume
+ // check if consume group is allowed to consume
GroupConsumeCtrlEntity ctrlEntity =
metaStoreService.getConsumeCtrlByGroupAndTopic(groupName, topicItem);
- if (ctrlEntity == null) {
+ if (ctrlEntity == null || !ctrlEntity.isEnableConsume()) {
disableCsmTopicSet.add(topicItem);
}
- //check if consume group is required filter consume
+ // check if consume group is required filter consume
if (ctrlEntity.isEnableFilterConsume()) {
enableFltCsmTopicSet.add(topicItem);
}
@@ -325,6 +312,69 @@ public class MetaDataManager implements Server {
enableFltCsmTopicSet, reqTopicCondMap, sBuffer, result);
}
+ private boolean checkConsumeRstrTopics(final String groupName, final String consumerId,
+ Set<String> enableFltCsmTopicSet,
+ Map<String, TreeSet<String>> reqTopicCondMap,
+ StringBuilder sBuffer, ProcessResult result) {
+ if (enableFltCsmTopicSet == null && enableFltCsmTopicSet.isEmpty()) {
+ result.setSuccResult("Ok!");
+ return result.isSuccess();
+ }
+ GroupConsumeCtrlEntity ctrlEntity;
+ for (String topicName : enableFltCsmTopicSet) {
+ ctrlEntity =
+ metaStoreService.getConsumeCtrlByGroupAndTopic(groupName, topicName);
+ if (ctrlEntity == null || !ctrlEntity.isEnableFilterConsume()) {
+ continue;
+ }
+ String allowedCondStr = ctrlEntity.getFilterCondStr();
+ if (allowedCondStr.length() == 2
+ && allowedCondStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
+ result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
+ sBuffer.append("[Restricted Group] ").append(consumerId)
+ .append(" : ").append(groupName)
+ .append(" not allowed to consume any data of topic ")
+ .append(topicName).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ TreeSet<String> condItemSet = reqTopicCondMap.get(topicName);
+ if (condItemSet == null || condItemSet.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
+ sBuffer.append("[Restricted Group] ").append(consumerId)
+ .append(" : ").append(groupName)
+ .append(" must set the filter conditions of topic ")
+ .append(topicName).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ Map<String, List<String>> unAuthorizedCondMap = new HashMap<>();
+ for (String item : condItemSet) {
+ if (!allowedCondStr.contains(sBuffer.append(TokenConstants.ARRAY_SEP)
+ .append(item).append(TokenConstants.ARRAY_SEP).toString())) {
+ List<String> unAuthConds = unAuthorizedCondMap.get(topicName);
+ if (unAuthConds == null) {
+ unAuthConds = new ArrayList<>();
+ unAuthorizedCondMap.put(topicName, unAuthConds);
+ }
+ unAuthConds.add(item);
+ }
+ sBuffer.delete(0, sBuffer.length());
+ }
+ if (!unAuthorizedCondMap.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
+ sBuffer.append("[Restricted Group] ").append(consumerId)
+ .append(" : unAuthorized filter conditions ")
+ .append(unAuthorizedCondMap).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ }
+ result.setSuccResult("Ok!");
+ return result.isSuccess();
+ }
+
+
private boolean checkFilterRstrTopics(final String groupName, final String consumerId,
Set<String> enableFltCsmTopicSet,
Map<String, TreeSet<String>> reqTopicCondMap,
@@ -393,23 +443,23 @@ public class MetaDataManager implements Server {
/**
* Add broker configure information
*
- * @param sBuilder the print information string buffer
+ * @param sBuffer the print information string buffer
* @param result the process result return
* @return true if success otherwise false
- */
+ */
public BrokerProcessResult addOrUpdBrokerConfig(boolean isAddOp, BaseEntity opInfoEntity,
int brokerId, String brokerIp, int brokerPort,
int brokerTlsPort, int brokerWebPort,
int regionId, int groupId,
ManageStatus mngStatus,
TopicPropGroup topicProps,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
BrokerConfEntity entity =
new BrokerConfEntity(opInfoEntity, brokerId, brokerIp);
entity.updModifyInfo(opInfoEntity.getDataVerId(), brokerPort,
brokerTlsPort, brokerWebPort, regionId, groupId, mngStatus, topicProps);
- return addOrUpdBrokerConfig(isAddOp, entity, sBuilder, result);
+ return addOrUpdBrokerConfig(isAddOp, entity, sBuffer, result);
}
public BrokerProcessResult addOrUpdBrokerConfig(boolean isAddOp, BrokerConfEntity entity,
@@ -1452,6 +1502,48 @@ public class MetaDataManager implements Server {
}
/**
+ * Add or Update topic control configure info
+ *
+ * @param sBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ */
+ public TopicProcessResult addOrUpdTopicCtrlConf(BaseEntity opEntity, String topicName,
+ Boolean enableTopicAuth, StringBuilder sBuffer,
+ ProcessResult result) {
+ TopicCtrlEntity entity =
+ new TopicCtrlEntity(opEntity, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, enableTopicAuth);
+ return addOrUpdTopicCtrlConf(entity, sBuffer, result);
+ }
+
+ public TopicProcessResult addOrUpdTopicCtrlConf(TopicCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ TopicCtrlEntity newEntity;
+ TopicCtrlEntity curEntity =
+ metaStoreService.getTopicCtrlConf(entity.getTopicName());
+ if (curEntity == null) {
+ newEntity = new TopicCtrlEntity(entity, entity.getTopicName());
+ newEntity.updModifyInfo(entity.getDataVerId(), entity.getTopicId(),
+ entity.getMaxMsgSizeInMB(), entity.isAuthCtrlEnable());
+ metaStoreService.addTopicCtrlConf(newEntity, sBuffer, result);
+ } else {
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.getTopicId(),
+ entity.getMaxMsgSizeInMB(), entity.isAuthCtrlEnable())) {
+ metaStoreService.updTopicCtrlConf(newEntity, sBuffer, result);
+ } else {
+ result.setSuccResult(null);
+ }
+ }
+ return new TopicProcessResult(0, entity.getTopicName(), result);
+ }
+
+ /**
* Delete topic control configure
*
* @param operator operator
@@ -1652,6 +1744,39 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
+ /**
+ * Update cluster default setting
+ *
+ * @return true if success otherwise false
+ */
+ public boolean addOrUpdClusterDefSetting(BaseEntity opEntity, int brokerPort,
+ int brokerTlsPort, int brokerWebPort,
+ int maxMsgSizeMB, int qryPriorityId,
+ Boolean flowCtrlEnable, int flowRuleCnt,
+ String flowCtrlInfo, TopicPropGroup topicProps,
+ StringBuilder strBuffer, ProcessResult result) {
+ ClusterSettingEntity newConf;
+ ClusterSettingEntity curConf = metaStoreService.getClusterConfig();
+ if (curConf == null) {
+ newConf = new ClusterSettingEntity(opEntity);
+ newConf.fillDefaultValue();
+ newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps);
+ metaStoreService.addClusterConfig(newConf, strBuffer, result);
+ } else {
+ newConf = curConf.clone();
+ newConf.updBaseModifyInfo(opEntity);
+ if (newConf.updModifyInfo(opEntity.getDataVerId(), brokerPort,
+ brokerTlsPort, brokerWebPort, maxMsgSizeMB, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, topicProps)) {
+ metaStoreService.updClusterConfig(newConf, strBuffer, result);
+ } else {
+ result.setSuccResult(null);
+ }
+ }
+ return result.isSuccess();
+ }
public ClusterSettingEntity getClusterDefSetting(boolean isMustConf) {
ClusterSettingEntity curClsSetting =
@@ -1665,37 +1790,35 @@ public class MetaDataManager implements Server {
// //////////////////////////////////////////////////////////////////////////////
public GroupProcessResult addOrUpdGroupResCtrlConf(boolean isAddOp, BaseEntity opEntity,
- String groupName, Boolean consumeEnable,
- String disableRsn, Boolean resCheckEnable,
+ String groupName, Boolean resCheckEnable,
int allowedBClientRate, int qryPriorityId,
Boolean flowCtrlEnable, int flowRuleCnt,
- String flowCtrlInfo, StringBuilder sBuilder,
+ String flowCtrlInfo, StringBuilder sBuffer,
ProcessResult result) {
GroupResCtrlEntity entity =
new GroupResCtrlEntity(opEntity, groupName);
- entity.updModifyInfo(opEntity.getDataVerId(),
- consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
+ entity.updModifyInfo(opEntity.getDataVerId(), resCheckEnable, allowedBClientRate,
qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
- return addOrUpdGroupResCtrlConf(isAddOp, entity, sBuilder, result);
+ return addOrUpdGroupResCtrlConf(isAddOp, entity, sBuffer, result);
}
/**
* Add group resource control configure info
*
* @param entity the group resource control info entity will be add
- * @param sBuilder the print info string buffer
+ * @param sBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
public GroupProcessResult addOrUpdGroupResCtrlConf(boolean isAddOp,
GroupResCtrlEntity entity,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
GroupResCtrlEntity curEntity =
metaStoreService.getGroupResCtrlConf(entity.getGroupName());
if (isAddOp) {
if (curEntity == null) {
- metaStoreService.addGroupResCtrlConf(entity, sBuilder, result);
+ metaStoreService.addGroupResCtrlConf(entity, sBuffer, result);
} else {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
DataOpErrCode.DERR_EXISTED.getDescription());
@@ -1707,11 +1830,10 @@ public class MetaDataManager implements Server {
} else {
GroupResCtrlEntity newEntity = curEntity.clone();
newEntity.updBaseModifyInfo(entity);
- if (newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableConsume(),
- entity.getDisableReason(), entity.isEnableResCheck(),
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableResCheck(),
entity.getAllowedBrokerClientRate(), entity.getQryPriorityId(),
entity.isFlowCtrlEnable(), entity.getRuleCnt(), entity.getFlowCtrlInfo())) {
- metaStoreService.updGroupResCtrlConf(newEntity, sBuilder, result);
+ metaStoreService.updGroupResCtrlConf(newEntity, sBuffer, result);
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
DataOpErrCode.DERR_UNCHANGED.getDescription());
@@ -1730,17 +1852,28 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
- public GroupProcessResult enOrDisConsumeCtrlConf(BaseEntity opEntity, String groupName,
- boolean enableConsume, String disReason,
- StringBuilder sBuffer, ProcessResult result) {
+ // Attention: compatible implementation for the old API
+ public GroupProcessResult addOrUpdGroupResCtrlConf(BaseEntity opEntity, String groupName,
+ Boolean resChkEnable, int allowedB2CRate,
+ StringBuilder sBuffer, ProcessResult result) {
GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
- newEntity.updModifyInfo(opEntity.getDataVerId(), enableConsume, disReason,
- null, TBaseConstants.META_VALUE_UNDEFINED,
+ newEntity.updModifyInfo(opEntity.getDataVerId(), resChkEnable, allowedB2CRate,
TBaseConstants.META_VALUE_UNDEFINED, null,
TBaseConstants.META_VALUE_UNDEFINED, null);
return addOrUpdGroupResCtrlConf(newEntity, sBuffer, result);
}
+ // Attention: compatible implementation for the old API
+ public GroupProcessResult addOrUpdGroupResCtrlConf(BaseEntity opEntity, String groupName,
+ int qryPriorityId, Boolean flowCtrlEnable,
+ int flowRuleCnt, String flowCtrlRuleInfo,
+ StringBuilder sBuffer, ProcessResult result) {
+ GroupResCtrlEntity newEntity = new GroupResCtrlEntity(opEntity, groupName);
+ newEntity.updModifyInfo(opEntity.getDataVerId(), null,
+ TBaseConstants.META_VALUE_UNDEFINED, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlRuleInfo);
+ return addOrUpdGroupResCtrlConf(newEntity, sBuffer, result);
+ }
/**
* add or update if present configure info
*
@@ -1749,6 +1882,7 @@ public class MetaDataManager implements Server {
* @param result the process result return
* @return true if success otherwise false
*/
+ // Attention: compatible implementation for the old API
public GroupProcessResult addOrUpdGroupResCtrlConf(GroupResCtrlEntity entity,
StringBuilder sBuffer,
ProcessResult result) {
@@ -1758,16 +1892,14 @@ public class MetaDataManager implements Server {
if (curEntity == null) {
newEntity = new GroupResCtrlEntity(entity, entity.getGroupName());
newEntity.fillDefaultValue();
- newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableConsume(),
- entity.getDisableReason(), entity.isEnableResCheck(),
+ newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableResCheck(),
entity.getAllowedBrokerClientRate(), entity.getQryPriorityId(),
entity.isFlowCtrlEnable(), entity.getRuleCnt(), entity.getFlowCtrlInfo());
metaStoreService.addGroupResCtrlConf(newEntity, sBuffer, result);
} else {
newEntity = curEntity.clone();
newEntity.updBaseModifyInfo(entity);
- if (newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableConsume(),
- entity.getDisableReason(), entity.isEnableResCheck(),
+ if (newEntity.updModifyInfo(entity.getDataVerId(), entity.isEnableResCheck(),
entity.getAllowedBrokerClientRate(), entity.getQryPriorityId(),
entity.isFlowCtrlEnable(), entity.getRuleCnt(), entity.getFlowCtrlInfo())) {
metaStoreService.updGroupResCtrlConf(newEntity, sBuffer, result);
@@ -1827,28 +1959,32 @@ public class MetaDataManager implements Server {
String groupName, String topicName,
Boolean enableCsm, String disableRsn,
Boolean enableFlt, String fltCondStr,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
GroupConsumeCtrlEntity entity =
new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
entity.updModifyInfo(opEntity.getDataVerId(),
enableCsm, disableRsn, enableFlt, fltCondStr);
- return addOrUpdGroupConsumeCtrlInfo(isAddOp, entity, sBuilder, result);
+ return addOrUpdGroupConsumeCtrlInfo(isAddOp, entity, sBuffer, result);
}
public GroupProcessResult addOrUpdGroupConsumeCtrlInfo(boolean isAddOp,
GroupConsumeCtrlEntity entity,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
// add group resource control record
- if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuilder, result)) {
+ if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuffer, result)) {
+ return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
+ }
+ if (!addIfAbsentTopicCtrlConf(entity.getTopicName(),
+ entity.getModifyUser(), sBuffer, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
GroupConsumeCtrlEntity curEntity =
metaStoreService.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
if (isAddOp) {
if (curEntity == null) {
- metaStoreService.addGroupConsumeCtrlConf(entity, sBuilder, result);
+ metaStoreService.addGroupConsumeCtrlConf(entity, sBuffer, result);
} else {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
DataOpErrCode.DERR_EXISTED.getDescription());
@@ -1863,7 +1999,7 @@ public class MetaDataManager implements Server {
if (newEntity.updModifyInfo(entity.getDataVerId(),
entity.isEnableConsume(), entity.getDisableReason(),
entity.isEnableFilterConsume(), entity.getFilterCondStr())) {
- metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuilder, result);
+ metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuffer, result);
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
DataOpErrCode.DERR_UNCHANGED.getDescription());
@@ -1873,10 +2009,60 @@ public class MetaDataManager implements Server {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
+ // Attention: compatible implementation for the old API
+ public GroupProcessResult addOrUpdGroupConsumeCtrlInfo(BaseEntity opEntity, String groupName,
+ String topicName, Boolean enableCsm,
+ String disReason, Boolean enableFlt,
+ String fltCondStr, StringBuilder sBuffer,
+ ProcessResult result) {
+ GroupConsumeCtrlEntity entity =
+ new GroupConsumeCtrlEntity(opEntity, groupName, topicName);
+ entity.updModifyInfo(opEntity.getDataVerId(),
+ enableCsm, disReason, enableFlt, fltCondStr);
+ return addOrUpdGroupConsumeCtrlInfo(entity, sBuffer, result);
+ }
+
+ // Attention: compatible implementation for the old API
+ public GroupProcessResult addOrUpdGroupConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ // add group resource control record
+ if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuffer, result)) {
+ return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
+ }
+ // add topic control record
+ if (!addIfAbsentTopicCtrlConf(entity.getTopicName(),
+ entity.getModifyUser(), sBuffer, result)) {
+ return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
+ }
+ GroupConsumeCtrlEntity newEntity;
+ GroupConsumeCtrlEntity curEntity =
+ metaStoreService.getGroupConsumeCtrlConfByRecKey(entity.getRecordKey());
+ if (curEntity == null) {
+ newEntity = new GroupConsumeCtrlEntity(entity,
+ entity.getGroupName(), entity.getTopicName());
+ newEntity.updModifyInfo(entity.getDataVerId(),
+ entity.isEnableConsume(), entity.getDisableReason(),
+ entity.isEnableFilterConsume(), entity.getFilterCondStr());
+ metaStoreService.addGroupConsumeCtrlConf(newEntity, sBuffer, result);
+ } else {
+ newEntity = curEntity.clone();
+ newEntity.updBaseModifyInfo(entity);
+ if (newEntity.updModifyInfo(entity.getDataVerId(),
+ entity.isEnableConsume(), entity.getDisableReason(),
+ entity.isEnableFilterConsume(), entity.getFilterCondStr())) {
+ metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuffer, result);
+ } else {
+ result.setSuccResult(null);
+ }
+ }
+ return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
+ }
+
public GroupProcessResult modGroupConsumeCtrlInfo(BaseEntity opEntity, String groupName,
String topicName, Boolean enableCsm,
String disableRsn, Boolean enableFilter,
- String filterCondStr, StringBuilder sBuilder,
+ String filterCondStr, StringBuilder sBuffer,
ProcessResult result) {
GroupConsumeCtrlEntity curEntity =
metaStoreService.getConsumeCtrlByGroupAndTopic(groupName, topicName);
@@ -1889,7 +2075,7 @@ public class MetaDataManager implements Server {
newEntity.updBaseModifyInfo(opEntity);
if (newEntity.updModifyInfo(opEntity.getDataVerId(),
enableCsm, disableRsn, enableFilter, filterCondStr)) {
- metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuilder, result);
+ metaStoreService.updGroupConsumeCtrlConf(newEntity, sBuffer, result);
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
DataOpErrCode.DERR_UNCHANGED.getDescription());
@@ -1898,7 +2084,7 @@ public class MetaDataManager implements Server {
}
public GroupProcessResult modGroupConsumeCtrlInfo(GroupConsumeCtrlEntity entity,
- StringBuilder sBuilder,
+ StringBuilder sBuffer,
ProcessResult result) {
GroupConsumeCtrlEntity curEntity =
metaStoreService.getConsumeCtrlByGroupAndTopic(
@@ -1908,17 +2094,21 @@ public class MetaDataManager implements Server {
DataOpErrCode.DERR_NOT_EXIST.getDescription());
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
- if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuilder, result)) {
+ if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuffer, result)) {
+ return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
+ }
+ if (!addIfAbsentTopicCtrlConf(entity.getTopicName(),
+ entity.getModifyUser(), sBuffer, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
- metaStoreService.updGroupConsumeCtrlConf(entity, sBuilder, result);
+ metaStoreService.updGroupConsumeCtrlConf(entity, sBuffer, result);
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
public List<GroupProcessResult> delGroupConsumeCtrlConf(String operator,
Set<String> groupNameSet,
Set<String> topicNameSet,
- StringBuilder strBuffer,
+ StringBuilder sBuffer,
ProcessResult result) {
List<GroupProcessResult> retInfo = new ArrayList<>();
if ((groupNameSet == null || groupNameSet.isEmpty())
@@ -1935,7 +2125,7 @@ public class MetaDataManager implements Server {
for (String recKey : rmvRecords) {
Tuple2<String, String> groupTopicTuple =
KeyBuilderUtils.splitRecKey2GroupTopic(recKey);
- metaStoreService.delGroupConsumeCtrlConf(operator, recKey, strBuffer, result);
+ metaStoreService.delGroupConsumeCtrlConf(operator, recKey, sBuffer, result);
retInfo.add(new GroupProcessResult(groupTopicTuple.getF1(),
groupTopicTuple.getF0(), result));
}
@@ -1943,7 +2133,7 @@ public class MetaDataManager implements Server {
}
private boolean addIfAbsentGroupResConf(BaseEntity opEntity, String groupName,
- StringBuilder sBuilder, ProcessResult result) {
+ StringBuilder sBuffer, ProcessResult result) {
GroupResCtrlEntity resCtrlEntity =
this.metaStoreService.getGroupResCtrlConf(groupName);
if (resCtrlEntity != null) {
@@ -1952,7 +2142,7 @@ public class MetaDataManager implements Server {
}
resCtrlEntity = new GroupResCtrlEntity(opEntity, groupName);
resCtrlEntity.fillDefaultValue();
- return this.metaStoreService.addGroupResCtrlConf(resCtrlEntity, sBuilder, result);
+ return this.metaStoreService.addGroupResCtrlConf(resCtrlEntity, sBuffer, result);
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
index 17d0b43..97b79c2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/BdbMetaStoreServiceImpl.java
@@ -821,7 +821,7 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
public boolean delGroupConsumeCtrlConf(String operator,
String groupName,
String topicName,
- StringBuilder strBuffer,
+ StringBuilder sBuffer,
ProcessResult result) {
// check current status
if (groupName == null && topicName == null) {
@@ -832,18 +832,18 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
return result.isSuccess();
}
if (groupConsumeCtrlMapper.delGroupConsumeCtrlConf(groupName, topicName, result)) {
- strBuffer.append("[delGroupConsumeCtrlConf], ").append(operator)
+ sBuffer.append("[delGroupConsumeCtrlConf], ").append(operator)
.append(" deleted group consume control record by index : ")
.append("groupName=").append(groupName)
.append(", topicName=").append(topicName);
- logger.info(strBuffer.toString());
+ logger.info(sBuffer.toString());
} else {
- strBuffer.append("[delGroupConsumeCtrlConf], ")
+ sBuffer.append("[delGroupConsumeCtrlConf], ")
.append("failure to delete group consume control record : ")
.append(result.getErrInfo());
- logger.warn(strBuffer.toString());
+ logger.warn(sBuffer.toString());
}
- strBuffer.delete(0, strBuffer.length());
+ sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
@@ -896,6 +896,11 @@ public class BdbMetaStoreServiceImpl implements MetaStoreService {
}
@Override
+ public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
+ return groupConsumeCtrlMapper.getConsumeCtrlByGroupName(groupName);
+ }
+
+ @Override
public Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet) {
return groupConsumeCtrlMapper.getConsumeCtrlKeyByTopicName(topicSet);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
index 2312058..21569c3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/MetaStoreService.java
@@ -203,7 +203,7 @@ public interface MetaStoreService extends KeepAlive, Server {
*
* @param operator operator
* @param topicName the topicName will be deleted
- * @param strBuffer the print info string buffer
+ * @param sBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
@@ -302,7 +302,7 @@ public interface MetaStoreService extends KeepAlive, Server {
boolean delGroupConsumeCtrlConf(String operator,
String groupName,
String topicName,
- StringBuilder strBuffer,
+ StringBuilder sBuffer,
ProcessResult result);
/**
* Delete group consume control configure
@@ -331,6 +331,8 @@ public interface MetaStoreService extends KeepAlive, Server {
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
+ List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+
Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet);
Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index 65bbaeb..8217bb7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -85,16 +85,6 @@ public class BaseEntity implements Serializable, Cloneable {
public boolean updBaseModifyInfo(BaseEntity opInfoEntity) {
boolean changed = false;
- if (TStringUtils.isNotBlank(opInfoEntity.getCreateUser())
- && !Objects.equals(createUser, opInfoEntity.getCreateUser())) {
- changed = true;
- this.createUser = opInfoEntity.getCreateUser();
- }
- if (opInfoEntity.getCreateDate() != null
- && !Objects.equals(createDate, opInfoEntity.getCreateDate())) {
- changed = true;
- this.setCreateDate(opInfoEntity.getCreateDate());
- }
if (TStringUtils.isNotBlank(opInfoEntity.getModifyUser())
&& !Objects.equals(modifyUser, opInfoEntity.getModifyUser())) {
changed = true;
@@ -113,6 +103,28 @@ public class BaseEntity implements Serializable, Cloneable {
return changed;
}
+ public boolean updQueryKeyInfo(long newDataVerId,
+ String newCreateUser,
+ String newModifyUser) {
+ boolean changed = false;
+ // check and set dataVersionId field
+ if (newDataVerId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.dataVersionId != newDataVerId) {
+ changed = true;
+ this.dataVersionId = newDataVerId;
+ }
+ if (TStringUtils.isNotBlank(newCreateUser)
+ && !Objects.equals(createUser, newCreateUser)) {
+ changed = true;
+ this.createUser = newCreateUser;
+ }
+ if (TStringUtils.isNotBlank(newModifyUser)
+ && !Objects.equals(modifyUser, newModifyUser)) {
+ changed = true;
+ this.modifyUser = newModifyUser;
+ }
+ return changed;
+ }
public boolean updBaseModifyInfo(long newDataVerId, String newCreateUser,
Date newCreateDate, String newModifyUser,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 483320e..fe2b6b6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -301,6 +301,34 @@ public class ClusterSettingEntity extends BaseEntity implements Cloneable {
return sBuilder;
}
+ /**
+ * Serialize field to old version json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ public StringBuilder toOldVerFlowCtrlWebJsonStr(StringBuilder sBuilder,
+ boolean isLongName) {
+ int statusId = gloFlowCtrlStatus.isEnable() ? 1 : 0;
+ if (isLongName) {
+ sBuilder.append("{\"statusId\":").append(statusId)
+ .append(",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
+ .append(",\"qryPriorityId\":").append(qryPriorityId)
+ .append(",\"ruleCnt\":").append(gloFlowCtrlRuleCnt)
+ .append(",\"flowCtrlInfo\":").append(gloFlowCtrlRuleInfo);
+ } else {
+ sBuilder.append("{\"statusId\":").append(statusId)
+ .append(",\"mxMsgInMB\":").append(maxMsgSizeInMB)
+ .append(",\"qryPriId\":").append(qryPriorityId)
+ .append(",\"fCtrlCnt\":").append(gloFlowCtrlRuleCnt)
+ .append(",\"fCtrlInfo\":").append(gloFlowCtrlRuleInfo);
+ }
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index 8de8881..1e7a3b9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -130,7 +130,7 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
return consumeEnable.isEnable();
}
- private void setConsumeEnable(boolean enableConsume) {
+ public void setConsumeEnable(boolean enableConsume) {
if (enableConsume) {
this.consumeEnable = EnableStatus.STATUS_ENABLE;
} else {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
index a922c15..35f9b96 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java
@@ -32,9 +32,6 @@ import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntit
public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
// group name
private String groupName = "";
- // Enable consume control, global control
- private EnableStatus consumeEnable = EnableStatus.STATUS_UNDEFINE;
- private String disableReason = "";
// resource check control
private EnableStatus resCheckStatus = EnableStatus.STATUS_UNDEFINE;
private int allowedBrokerClientRate = TBaseConstants.META_VALUE_UNDEFINED;
@@ -55,20 +52,12 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.groupName = groupName;
}
- public GroupResCtrlEntity(String groupName,
- boolean enableConsume, String disableRsn,
- boolean enableFlowCtrl,
- int qryPriorityId, int ruleCnt,
+ public GroupResCtrlEntity(String groupName, int qryPriorityId,
+ boolean enableFlowCtrl, int ruleCnt,
String flowCtrlInfo, String createUser,
Date createDate) {
super(createUser, createDate);
this.groupName = groupName;
- if (enableConsume) {
- this.consumeEnable = EnableStatus.STATUS_ENABLE;
- } else {
- this.consumeEnable = EnableStatus.STATUS_DISABLE;
- }
- this.disableReason = disableRsn;
if (enableFlowCtrl) {
this.flowCtrlStatus = EnableStatus.STATUS_ENABLE;
} else {
@@ -83,8 +72,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
super(bdbEntity.getSerialId(),
bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
this.groupName = bdbEntity.getGroupName();
- this.consumeEnable = bdbEntity.getConsumeEnable();
- this.disableReason = bdbEntity.getDisableConsumeReason();
this.qryPriorityId = bdbEntity.getQryPriorityId();
this.ruleCnt = bdbEntity.getRuleCnt();
this.flowCtrlInfo = bdbEntity.getFlowCtrlInfo();
@@ -103,8 +90,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
new BdbGroupFlowCtrlEntity(getDataVerId(), this.groupName,
this.flowCtrlInfo, statusId, this.ruleCnt, this.qryPriorityId,
getAttributes(), getCreateUser(), getCreateDate());
- bdbEntity.setConsumeEnable(consumeEnable);
- bdbEntity.setDisableConsumeReason(disableReason);
bdbEntity.setResCheckStatus(resCheckStatus);
bdbEntity.setAllowedBrokerClientRate(allowedBrokerClientRate);
return bdbEntity;
@@ -116,8 +101,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
* @return object
*/
public GroupResCtrlEntity fillDefaultValue() {
- this.consumeEnable = EnableStatus.STATUS_ENABLE;
- this.disableReason = "";
this.resCheckStatus = EnableStatus.STATUS_DISABLE;
this.allowedBrokerClientRate = 0;
this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
@@ -183,22 +166,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.flowCtrlStatus = flowCtrlStatus;
}
- public boolean isEnableConsume() {
- return consumeEnable.isEnable();
- }
-
- public EnableStatus getConsumeEnable() {
- return consumeEnable;
- }
-
- public String getDisableReason() {
- return disableReason;
- }
-
- public void setDisableReason(String disableReason) {
- this.disableReason = disableReason;
- }
-
private void setResCheckStatus(boolean enableResChk) {
if (enableResChk) {
this.resCheckStatus = EnableStatus.STATUS_ENABLE;
@@ -220,20 +187,12 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.flowCtrlInfo = flowCtrlInfo;
}
- public void setConsumeEnable(boolean enableConsume) {
- if (enableConsume) {
- this.consumeEnable = EnableStatus.STATUS_ENABLE;
- } else {
- this.consumeEnable = EnableStatus.STATUS_DISABLE;
- }
- }
-
/**
* update subclass field values
*
* @return if changed
*/
- public boolean updModifyInfo(long dataVerId, Boolean enableConsume, String disableRsn,
+ public boolean updModifyInfo(long dataVerId,
Boolean resChkEnable, int allowedB2CRate,
int qryPriorityId, Boolean flowCtrlEnable,
int flowRuleCnt, String flowCtrlRuleInfo) {
@@ -245,18 +204,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
this.setDataVersionId(dataVerId);
}
// check and set resCheckStatus info
- if (enableConsume != null
- && this.consumeEnable.isEnable() != enableConsume) {
- changed = true;
- setConsumeEnable(enableConsume);
- }
- // check and set disableReason info
- if (TStringUtils.isNotBlank(disableRsn)
- && !disableRsn.equals(disableReason)) {
- changed = true;
- this.disableReason = disableRsn;
- }
- // check and set resCheckStatus info
if (resChkEnable != null
&& this.resCheckStatus.isEnable() != resChkEnable) {
changed = true;
@@ -310,8 +257,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
&& target.getQryPriorityId() != this.qryPriorityId)
|| (TStringUtils.isNotBlank(target.getGroupName())
&& !target.getGroupName().equals(this.groupName))
- || (target.getConsumeEnable() != EnableStatus.STATUS_UNDEFINE
- && target.getConsumeEnable() != this.consumeEnable)
|| (target.getResCheckStatus() != EnableStatus.STATUS_UNDEFINE
&& target.getResCheckStatus() != this.resCheckStatus)
|| (target.getFlowCtrlStatus() != EnableStatus.STATUS_UNDEFINE
@@ -326,18 +271,16 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
/**
* Serialize field to json format
*
- * @param sBuilder build container
+ * @param sBuffer build container
* @param isLongName if return field key is long name
* @param fullFormat if return full format json
* @return
*/
- public StringBuilder toWebJsonStr(StringBuilder sBuilder,
+ public StringBuilder toWebJsonStr(StringBuilder sBuffer,
boolean isLongName,
boolean fullFormat) {
if (isLongName) {
- sBuilder.append("{\"groupName\":\"").append(groupName).append("\"")
- .append(",\"consumeEnable\":").append(consumeEnable.isEnable())
- .append(",\"reason\":\"").append(disableReason).append("\"")
+ sBuffer.append("{\"groupName\":\"").append(groupName).append("\"")
.append(",\"resCheckEnable\":").append(resCheckStatus.isEnable())
.append(",\"alwdBrokerClientRate\":").append(allowedBrokerClientRate)
.append(",\"qryPriorityId\":").append(qryPriorityId)
@@ -345,9 +288,7 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
.append(",\"flowCtrlRuleCount\":").append(ruleCnt)
.append(",\"flowCtrlInfo\":").append(flowCtrlInfo);
} else {
- sBuilder.append("{\"group\":\"").append(groupName).append("\"")
- .append(",\"csmEn\":").append(consumeEnable.isEnable())
- .append(",\"rsn\":\"").append(disableReason).append("\"")
+ sBuffer.append("{\"group\":\"").append(groupName).append("\"")
.append(",\"resChkEn\":").append(resCheckStatus.isEnable())
.append(",\"abcr\":").append(allowedBrokerClientRate)
.append(",\"qryPriId\":").append(qryPriorityId)
@@ -355,11 +296,39 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
.append(",\"fCtrlCnt\":").append(ruleCnt)
.append(",\"fCtrlInfo\":").append(flowCtrlInfo);
}
- super.toWebJsonStr(sBuilder, isLongName);
+ super.toWebJsonStr(sBuffer, isLongName);
if (fullFormat) {
- sBuilder.append("}");
+ sBuffer.append("}");
+ }
+ return sBuffer;
+ }
+
+ /**
+ * Serialize field to old version json format
+ *
+ * @param sBuffer build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ public StringBuilder toOldVerFlowCtrlWebJsonStr(StringBuilder sBuffer,
+ boolean isLongName) {
+ int statusId = flowCtrlStatus.isEnable() ? 1 : 0;
+ if (isLongName) {
+ sBuffer.append("{\"groupName\":\"").append(groupName)
+ .append("\",\"statusId\":").append(statusId)
+ .append(",\"qryPriorityId\":").append(qryPriorityId)
+ .append(",\"ruleCnt\":").append(ruleCnt)
+ .append(",\"flowCtrlInfo\":").append(flowCtrlInfo);
+ } else {
+ sBuffer.append("{\"group\":\"").append(groupName)
+ .append("\",\"statusId\":").append(statusId)
+ .append(",\"qryPriId\":").append(qryPriorityId)
+ .append(",\"fCtrlCnt\":").append(ruleCnt)
+ .append(",\"fCtrlInfo\":").append(flowCtrlInfo);
}
- return sBuilder;
+ super.toWebJsonStr(sBuffer, isLongName);
+ sBuffer.append("}");
+ return sBuffer;
}
/**
@@ -373,8 +342,6 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
&& qryPriorityId == other.qryPriorityId
&& ruleCnt == other.ruleCnt
&& groupName.equals(other.groupName)
- && consumeEnable == other.consumeEnable
- && Objects.equals(disableReason, other.disableReason)
&& resCheckStatus == other.resCheckStatus
&& flowCtrlStatus == other.flowCtrlStatus
&& Objects.equals(flowCtrlInfo, other.flowCtrlInfo);
@@ -397,15 +364,14 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), groupName, consumeEnable,
- disableReason, resCheckStatus, allowedBrokerClientRate,
+ return Objects.hash(super.hashCode(), groupName,
+ resCheckStatus, allowedBrokerClientRate,
qryPriorityId, flowCtrlStatus, ruleCnt, flowCtrlInfo);
}
@Override
public GroupResCtrlEntity clone() {
GroupResCtrlEntity copy = (GroupResCtrlEntity) super.clone();
- copy.setConsumeEnable(getConsumeEnable().isEnable());
copy.setFlowCtrlStatus(getFlowCtrlStatus());
copy.setResCheckStatus(getResCheckStatus());
return copy;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
index 713ad26..e766f05 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/mapper/GroupConsumeCtrlMapper.java
@@ -46,6 +46,8 @@ public interface GroupConsumeCtrlMapper extends AbstractMapper {
List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String topicName);
+ List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName);
+
Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicSet);
Set<String> getConsumeCtrlKeyByGroupName(Set<String> groupSet);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 171f970..6b9f732 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -245,6 +245,24 @@ public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
}
@Override
+ public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String groupName) {
+ ConcurrentHashSet<String> keySet =
+ grpConsumeCtrlGroupCache.get(groupName);
+ if (keySet == null || keySet.isEmpty()) {
+ return Collections.emptyList();
+ }
+ GroupConsumeCtrlEntity entity;
+ List<GroupConsumeCtrlEntity> result = new ArrayList<>();
+ for (String recordKey : keySet) {
+ entity = grpConsumeCtrlCache.get(recordKey);
+ if (entity != null) {
+ result.add(entity);
+ }
+ }
+ return result;
+ }
+
+ @Override
public Set<String> getConsumeCtrlKeyByTopicName(Set<String> topicNameSet) {
ConcurrentHashSet<String> qrySet;
Set<String> retResult = new HashSet<>();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
index bc1ce33..d6a8b24 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -17,26 +17,23 @@
package org.apache.tubemq.server.master.web.handler;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.policies.FlowCtrlItem;
-import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
-import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
-
-
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+@Deprecated
public class WebAdminFlowRuleHandler extends AbstractWebHandler {
private static final String blankFlowCtrlRules = "[]";
@@ -52,145 +49,102 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
@Override
public void registerWebApiMethod() {
// register query method
- registerQueryWebMethod("admin_query_def_flow_control_rule",
- "adminQueryDefGroupFlowCtrlRule");
registerQueryWebMethod("admin_query_group_flow_control_rule",
- "adminQuerySpecGroupFlowCtrlRule");
+ "adminQueryGroupFlowCtrlRule");
// register modify method
- registerModifyWebMethod("admin_set_def_flow_control_rule",
- "adminSetDefGroupFlowCtrlRule");
registerModifyWebMethod("admin_set_group_flow_control_rule",
- "adminSetSpecGroupFlowCtrlRule");
- registerModifyWebMethod("admin_rmv_def_flow_control_rule",
- "adminDelDefGroupFlowCtrlRuleStatus");
+ "adminSetGroupFlowCtrlRule");
registerModifyWebMethod("admin_rmv_group_flow_control_rule",
- "adminDelSpecGroupFlowCtrlRuleStatus");
- registerModifyWebMethod("admin_upd_def_flow_control_rule",
- "adminModDefGroupFlowCtrlRuleStatus");
+ "adminDelGroupFlowCtrlRule");
registerModifyWebMethod("admin_upd_group_flow_control_rule",
- "adminModSpecGroupFlowCtrlRuleStatus");
- }
-
- public StringBuilder adminQueryDefGroupFlowCtrlRule(HttpServletRequest req) {
- return innQueryGroupFlowCtrlRule(req, true);
- }
-
- public StringBuilder adminQuerySpecGroupFlowCtrlRule(HttpServletRequest req) {
- return innQueryGroupFlowCtrlRule(req, false);
- }
-
- public StringBuilder adminSetDefGroupFlowCtrlRule(HttpServletRequest req) {
- return innSetFlowControlRule(req, true);
- }
-
- public StringBuilder adminSetSpecGroupFlowCtrlRule(HttpServletRequest req) {
- return innSetFlowControlRule(req, false);
- }
-
- public StringBuilder adminDelDefGroupFlowCtrlRuleStatus(HttpServletRequest req) {
- return innDelGroupFlowCtrlRuleStatus(req, true);
- }
-
- public StringBuilder adminDelSpecGroupFlowCtrlRuleStatus(HttpServletRequest req) {
- return innDelGroupFlowCtrlRuleStatus(req, false);
- }
-
- public StringBuilder adminModDefGroupFlowCtrlRuleStatus(HttpServletRequest req) {
- return innModGroupFlowCtrlRuleStatus(req, true);
- }
-
- public StringBuilder adminModSpecGroupFlowCtrlRuleStatus(HttpServletRequest req) {
- return innModGroupFlowCtrlRuleStatus(req, false);
+ "adminUpdGroupFlowCtrlRule");
}
/**
- * add flow control rule
+ * query group flow control rule
*
* @param req
- * @param do4DefFlowCtrl
* @return
*/
- private StringBuilder innSetFlowControlRule(HttpServletRequest req,
- boolean do4DefFlowCtrl) {
+ public StringBuilder adminQueryGroupFlowCtrlRule(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ // build query entity
+ GroupResCtrlEntity entity = new GroupResCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, entity, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- // get createUser info
+ // get group list
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.CREATEUSER, true, null, sBuffer, result)) {
+ WebFieldDef.COMPSGROUPNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- String createUser = (String) result.retData1;
- // check and get create date
- if (!WebParameterUtils.getDateParameter(req,
- WebFieldDef.CREATEDATE, false, new Date(), sBuffer, result)) {
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // get and valid qryPriorityId info
+ if (!WebParameterUtils.getQryPriorityIdParameter(req,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.QRY_PRIORITY_MIN_VALUE, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Date createDate = (Date) result.retData1;
- // get rule required status info
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.STATUSID, false, 0, 0, sBuffer, result)) {
+ int inQryPriorityId = (int) result.getRetData();
+ // get flowCtrlEnable's statusId info
+ if (!WebParameterUtils.getFlowCtrlStatusParamValue(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- int statusId = (int) result.retData1;
- // get and valid priority info
- if (!getQryPriorityIdWithCheck(req, false, 301, 101, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int qryPriorityId = (int) result.retData1;
- // get group name info
- if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- // get and flow control rule info
- int ruleCnt = getAndCheckFlowRules(req, blankFlowCtrlRules, result);
- if (!result.success) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- String flowCtrlInfo = (String) result.retData1;
- try {
- // add flow control to bdb
- for (String groupName : batchGroupNames) {
- if (groupName.equals(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
- brokerConfManager.confAddBdbGroupFlowCtrl(
- new BdbGroupFlowCtrlEntity(flowCtrlInfo,
- statusId, ruleCnt, qryPriorityId, "",
- false, createUser, createDate));
- } else {
- brokerConfManager.confAddBdbGroupFlowCtrl(
- new BdbGroupFlowCtrlEntity(groupName,
- flowCtrlInfo, statusId, ruleCnt, qryPriorityId, "",
- false, createUser, createDate));
- }
+ Boolean flowCtrlEnable = (Boolean) result.getRetData();
+ entity.updModifyInfo(entity.getDataVerId(), null,
+ TBaseConstants.META_VALUE_UNDEFINED, inQryPriorityId,
+ flowCtrlEnable, TBaseConstants.META_VALUE_UNDEFINED, null);
+ Map<String, GroupResCtrlEntity> groupResCtrlEntityMap =
+ metaDataManager.confGetGroupResCtrlConf(groupNameSet, entity);
+ // build return result
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (GroupResCtrlEntity resCtrlEntity : groupResCtrlEntityMap.values()) {
+ if (resCtrlEntity == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
}
- WebParameterUtils.buildSuccessResult(sBuffer);
- } catch (Exception e) {
- WebParameterUtils.buildFailResult(sBuffer, e.getMessage());
+ sBuffer = entity.toOldVerFlowCtrlWebJsonStr(sBuffer, true);
}
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
return sBuffer;
}
/**
- * delete flow control rule
+ * add group flow control rule
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminSetGroupFlowCtrlRule(HttpServletRequest req) {
+ return innAddOrUpdGroupFlowCtrlRule(req, true);
+ }
+
+ /**
+ * modify group flow control rule
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminUpdGroupFlowCtrlRule(HttpServletRequest req) {
+ return innAddOrUpdGroupFlowCtrlRule(req, false);
+ }
+
+ /**
+ * delete group flow control rule
*
* @param req
- * @param do4DefFlowCtrl
* @return
*/
- private StringBuilder innDelGroupFlowCtrlRuleStatus(HttpServletRequest req,
- boolean do4DefFlowCtrl) {
+ public StringBuilder adminDelGroupFlowCtrlRule(HttpServletRequest req) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
@@ -199,45 +153,36 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- // get modifyUser info
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.CREATEUSER, true, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- String modifyUser = (String) result.retData1;
- // check and get modifyDate date
- if (!WebParameterUtils.getDateParameter(req,
- WebFieldDef.CREATEDATE, false, new Date(), sBuffer, result)) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Date modifyDate = (Date) result.retData1;
- // get group name info
- if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, sBuffer, result)) {
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- try {
- brokerConfManager.confDeleteBdbGroupFlowCtrl(batchGroupNames);
- WebParameterUtils.buildSuccessResult(sBuffer);
- } catch (Exception e) {
- WebParameterUtils.buildFailResult(sBuffer, e.getMessage());
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // add or modify records
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : groupNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(opEntity, groupName,
+ TServerConstants.QRY_PRIORITY_DEF_VALUE, Boolean.FALSE,
+ 0, TServerConstants.BLANK_FLOWCTRL_RULES, sBuffer, result));
}
- return sBuffer;
+ return buildRetInfo(retInfoList, sBuffer);
}
/**
- * modify flow control rule
+ * add or modify flow control rule
*
* @param req
- * @param do4DefFlowCtrl
* @return
*/
- private StringBuilder innModGroupFlowCtrlRuleStatus(HttpServletRequest req,
- boolean do4DefFlowCtrl) {
- // #lizard forgives
+ private StringBuilder innAddOrUpdGroupFlowCtrlRule(HttpServletRequest req, boolean isAddOp) {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
// valid operation authorize info
@@ -246,277 +191,67 @@ public class WebAdminFlowRuleHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- // get modifyUser info
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.CREATEUSER, true, null, sBuffer, result)) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- String modifyUser = (String) result.retData1;
- // check and get modifyDate date
- if (!WebParameterUtils.getDateParameter(req,
- WebFieldDef.CREATEDATE, false, new Date(), sBuffer, result)) {
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Date modifyDate = (Date) result.retData1;
- // get group name info
- if (!getGroupNameWithCheck(req, true, do4DefFlowCtrl, sBuffer, result)) {
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // get and valid qryPriorityId info
+ if (!WebParameterUtils.getQryPriorityIdParameter(req,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.QRY_PRIORITY_MIN_VALUE, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- // get rule required status info
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.STATUSID,
- false, TBaseConstants.META_VALUE_UNDEFINED, 0, sBuffer, result)) {
+ int qryPriorityId = (int) result.getRetData();
+ // get flowCtrlEnable's statusId info
+ if (!WebParameterUtils.getFlowCtrlStatusParamValue(req,
+ false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- int statusId = (int) result.retData1;
+ Boolean flowCtrlEnable = (Boolean) result.getRetData();
// get and flow control rule info
- int ruleCnt = getAndCheckFlowRules(req, null, result);
+ int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req,
+ (isAddOp ? TServerConstants.BLANK_FLOWCTRL_RULES : null), sBuffer, result);
if (!result.success) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- String newFlowCtrlInfo = (String) result.retData1;
- // get and valid priority info
- if (!getQryPriorityIdWithCheck(req, false,
- TBaseConstants.META_VALUE_UNDEFINED, 101, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int qryPriorityId = (int) result.retData1;
- try {
- boolean foundChange;
- for (String groupName : batchGroupNames) {
- // check if record changed
- BdbGroupFlowCtrlEntity oldEntity =
- brokerConfManager.getBdbGroupFlowCtrl(groupName);
- if (oldEntity != null) {
- foundChange = false;
- BdbGroupFlowCtrlEntity newGroupFlowCtrlEntity =
- new BdbGroupFlowCtrlEntity(oldEntity.getGroupName(),
- oldEntity.getFlowCtrlInfo(), oldEntity.getStatusId(),
- oldEntity.getRuleCnt(), oldEntity.getAttributes(),
- oldEntity.getSsdTranslateId(), oldEntity.isNeedSSDProc(),
- oldEntity.getCreateUser(), oldEntity.getCreateDate());
- if (statusId != TBaseConstants.META_VALUE_UNDEFINED
- && statusId != oldEntity.getStatusId()) {
- foundChange = true;
- newGroupFlowCtrlEntity.setStatusId(statusId);
- }
- if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
- && qryPriorityId != oldEntity.getQryPriorityId()) {
- foundChange = true;
- newGroupFlowCtrlEntity.setQryPriorityId(qryPriorityId);
- }
- if (TStringUtils.isNotBlank(newFlowCtrlInfo)
- && !newFlowCtrlInfo.equals(oldEntity.getFlowCtrlInfo())) {
- foundChange = true;
- newGroupFlowCtrlEntity.setFlowCtrlInfo(ruleCnt, newFlowCtrlInfo);
- }
- // update record if found change
- if (foundChange) {
- try {
- newGroupFlowCtrlEntity.setModifyInfo(modifyUser, modifyDate);
- brokerConfManager.confUpdateBdbGroupFlowCtrl(newGroupFlowCtrlEntity);
- } catch (Throwable ee) {
- //
- }
- }
- }
- }
- WebParameterUtils.buildSuccessResult(sBuffer);
- } catch (Exception e) {
- WebParameterUtils.buildFailResult(sBuffer, e.getMessage());
+ String flowCtrlInfo = (String) result.getRetData();
+ // add or modify records
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : groupNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(opEntity, groupName,
+ qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuffer, result));
}
- return sBuffer;
+ return buildRetInfo(retInfoList, sBuffer);
}
- /**
- * query flow control rule
- *
- * @param req
- * @param do4DefFlowCtrl
- * @return
- */
- private StringBuilder innQueryGroupFlowCtrlRule(HttpServletRequest req,
- boolean do4DefFlowCtrl) {
- ProcessResult result = new ProcessResult();
- StringBuilder sBuffer = new StringBuilder(512);
- BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity = new BdbGroupFlowCtrlEntity();
- // get modifyUser info
- WebParameterUtils.getStringParamValue(req,
- WebFieldDef.CREATEUSER, false, null, sBuffer, result);
- bdbGroupFlowCtrlEntity.setCreateUser((String) result.retData1);
- // get status id info
- WebParameterUtils.getIntParamValue(req, WebFieldDef.STATUSID, false,
- TBaseConstants.META_VALUE_UNDEFINED, 0, sBuffer, result);
- bdbGroupFlowCtrlEntity.setStatusId((int) result.retData1);
- // get and valid priority info
- getQryPriorityIdWithCheck(req, false,
- TBaseConstants.META_VALUE_UNDEFINED, 101, sBuffer, result);
- bdbGroupFlowCtrlEntity.setQryPriorityId((int) result.retData1);
- getGroupNameWithCheck(req, false, do4DefFlowCtrl, sBuffer, result);
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- // query group flow ctrl infos
- List<BdbGroupFlowCtrlEntity> webGroupFlowCtrlEntities =
- brokerConfManager.confGetBdbGroupFlowCtrl(bdbGroupFlowCtrlEntity);
+ private StringBuilder buildRetInfo(List<GroupProcessResult> retInfo,
+ StringBuilder sBuffer) {
int totalCnt = 0;
- boolean found = false;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
- if (do4DefFlowCtrl) {
- for (BdbGroupFlowCtrlEntity entity : webGroupFlowCtrlEntities) {
- if (entity.getGroupName().equals(
- TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
- if (totalCnt++ > 0) {
- sBuffer.append(",");
- }
- sBuffer = entity.toJsonString(sBuffer);
- break;
- }
- }
- } else {
- for (BdbGroupFlowCtrlEntity entity : webGroupFlowCtrlEntities) {
- if (entity.getGroupName().equals(
- TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL)) {
- continue;
- }
- found = false;
- for (String tmpGroupName : batchGroupNames) {
- if (entity.getGroupName().equals(tmpGroupName)) {
- found = true;
- break;
- }
- }
- if (found) {
- if (totalCnt++ > 0) {
- sBuffer.append(",");
- }
- sBuffer = entity.toJsonString(sBuffer);
- }
+ for (GroupProcessResult entry : retInfo) {
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
}
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
+ .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"errCode\":").append(entry.getErrCode())
+ .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
return sBuffer;
}
- // translate rule info to json format string
- private int getAndCheckFlowRules(HttpServletRequest req,
- String defValue,
- ProcessResult result) {
- int ruleCnt = 0;
- StringBuilder strBuffer = new StringBuilder(512);
- // get parameter value
- String paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.name);
- if (paramValue == null) {
- paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.shortName);
- }
- if (TStringUtils.isBlank(paramValue)) {
- result.setSuccResult(defValue);
- return ruleCnt;
- }
- strBuffer.append("[");
- paramValue = paramValue.trim();
- List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
- FlowCtrlRuleHandler flowCtrlRuleHandler =
- new FlowCtrlRuleHandler(true);
- Map<Integer, List<FlowCtrlItem>> flowCtrlItemMap;
- try {
- flowCtrlItemMap =
- flowCtrlRuleHandler.parseFlowCtrlInfo(paramValue);
- } catch (Throwable e) {
- result.setFailResult(new StringBuilder(512)
- .append("Parse parameter ").append(WebFieldDef.FLOWCTRLSET.name)
- .append(" failure: '").append(e.toString()).toString());
- return 0;
- }
- for (Integer typeId : ruleTypes) {
- if (typeId != null) {
- int rules = 0;
- List<FlowCtrlItem> flowCtrlItems = flowCtrlItemMap.get(typeId);
- if (flowCtrlItems != null) {
- if (ruleCnt++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("{\"type\":").append(typeId.intValue()).append(",\"rule\":[");
- for (FlowCtrlItem flowCtrlItem : flowCtrlItems) {
- if (flowCtrlItem != null) {
- if (rules++ > 0) {
- strBuffer.append(",");
- }
- strBuffer = flowCtrlItem.toJsonString(strBuffer);
- }
- }
- strBuffer.append("]}");
- }
- }
- }
- strBuffer.append("]");
- result.setSuccResult(strBuffer.toString());
- return ruleCnt;
- }
-
- private boolean getGroupNameWithCheck(HttpServletRequest req, boolean required,
- boolean do4DefFlowCtrl, StringBuilder sBuffer,
- ProcessResult result) {
- if (do4DefFlowCtrl) {
- result.setSuccResult(rsvGroupNameSet);
- return true;
- }
- // get group list
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSGROUPNAME, required, null, sBuffer, result)) {
- return result.success;
- }
- Set<String> inGroupSet = (Set<String>) result.retData1;
- for (String rsvGroup : rsvGroupNameSet) {
- if (inGroupSet.contains(rsvGroup)) {
- result.setFailResult(sBuffer.append("Illegal value in ")
- .append(WebFieldDef.COMPSGROUPNAME.name).append(" parameter: '")
- .append(rsvGroup).append("' is a system reserved value!").toString());
- sBuffer.delete(0, sBuffer.length());
- return false;
- }
- }
- return true;
- }
-
- private boolean getQryPriorityIdWithCheck(HttpServletRequest req, boolean required,
- int defValue, int minValue,
- StringBuilder sBuffer, ProcessResult result) {
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.QRYPRIORITYID,
- required, defValue, minValue, sBuffer, result)) {
- return result.success;
- }
- int qryPriorityId = (int) result.retData1;
- if (qryPriorityId > 303 || qryPriorityId < 101) {
- result.setFailResult(sBuffer.append("Illegal value in ")
- .append(WebFieldDef.QRYPRIORITYID.name)
- .append(" parameter: ").append(WebFieldDef.QRYPRIORITYID.name)
- .append(" value must be greater than or equal")
- .append(" to 101 and less than or equal to 303!").toString());
- sBuffer.delete(0, sBuffer.length());
- return false;
- }
- if (!allowedPriorityVal.contains(qryPriorityId % 100)) {
- result.setFailResult(sBuffer.append("Illegal value in ")
- .append(WebFieldDef.QRYPRIORITYID.name).append(" parameter: the units of ")
- .append(WebFieldDef.QRYPRIORITYID.name).append(" must in ")
- .append(allowedPriorityVal).toString());
- sBuffer.delete(0, sBuffer.length());
- return false;
- }
- if (!allowedPriorityVal.contains(qryPriorityId / 100)) {
- result.setFailResult(sBuffer.append("Illegal value in ")
- .append(WebFieldDef.QRYPRIORITYID.name).append(" parameter: the hundreds of ")
- .append(WebFieldDef.QRYPRIORITYID.name).append(" must in ")
- .append(allowedPriorityVal).toString());
- sBuffer.delete(0, sBuffer.length());
- return false;
- }
- return true;
- }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index f587849..52ecd3f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -18,25 +18,17 @@
package org.apache.tubemq.server.master.web.handler;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
@@ -47,6 +39,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+@Deprecated
public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
private static final Logger logger =
@@ -113,7 +107,7 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
ProcessResult result = new ProcessResult();
StringBuilder sBuffer = new StringBuilder(512);
// build query entity
- GroupResCtrlEntity entity = new GroupResCtrlEntity();
+ GroupConsumeCtrlEntity entity = new GroupConsumeCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
if (!WebParameterUtils.getQueriedOperateInfo(req, entity, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -125,24 +119,33 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// only query disable consume group
entity.setConsumeEnable(false);
- Map<String, GroupResCtrlEntity> qryResult =
- metaDataManager.confGetGroupResCtrlConf(groupNameSet, entity);
+ Map<String, List<GroupConsumeCtrlEntity>> qryResult =
+ metaDataManager.getGroupConsumeCtrlConf(groupNameSet, topicNameSet, entity);
int totalCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
- for (GroupResCtrlEntity entry : qryResult.values()) {
- if (totalCnt++ > 0) {
- sBuffer.append(",");
+ for (List<GroupConsumeCtrlEntity> entryList : qryResult.values()) {
+ for (GroupConsumeCtrlEntity entry : entryList) {
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
+ .append(",\"reason\":\"").append(entry.getDisableReason()).append("\"")
+ .append(",\"dataVersionId\":").append(entry.getDataVerId())
+ .append(",\"createUser\":\"").append(entry.getCreateUser()).append("\"")
+ .append(",\"createDate\":\"").append(entry.getCreateDateStr()).append("\"")
+ .append(",\"modifyUser\":\"").append(entry.getModifyUser()).append("\"")
+ .append(",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
}
- sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
- .append(",\"reason\":\"").append(entry.getDisableReason()).append("\"")
- .append(",\"dataVersionId\":").append(entry.getDataVerId())
- .append(",\"createUser\":\"").append(entry.getCreateUser()).append("\"")
- .append(",\"createDate\":\"").append(entry.getCreateDateStr()).append("\"")
- .append(",\"modifyUser\":\"").append(entry.getModifyUser()).append("\"")
- .append(",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
return sBuffer;
@@ -170,14 +173,15 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ qryEntity.setConsumeEnable(true);
Map<String, List<GroupConsumeCtrlEntity>> qryResultMap =
metaDataManager.getGroupConsumeCtrlConf(groupNameSet, topicNameSet, qryEntity);
int totalCnt = 0;
@@ -228,14 +232,14 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// check and get condStatus field
if (!getCondStatusParamValue(req, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -247,7 +251,7 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> filterCondSet = (Set<String>) result.retData1;
+ Set<String> filterCondSet = (Set<String>) result.getRetData();
qryEntity.updModifyInfo(qryEntity.getDataVerId(),
null, null, filterEnable, null);
Map<String, List<GroupConsumeCtrlEntity>> qryResultMap =
@@ -310,7 +314,7 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
// get group list
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.OLDALWDBCRATE, false,
@@ -320,9 +324,9 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
int allowedBClientRate = (int) result.getRetData();
// query matched records
- entity.updModifyInfo(entity.getDataVerId(), null, null,
- null, allowedBClientRate, TBaseConstants.META_VALUE_UNDEFINED,
- null, TBaseConstants.META_VALUE_UNDEFINED, null);
+ entity.updModifyInfo(entity.getDataVerId(), null, allowedBClientRate,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TBaseConstants.META_VALUE_UNDEFINED, null);
Map<String, GroupResCtrlEntity> groupResCtrlEntityMap =
metaDataManager.confGetGroupResCtrlConf(groupNameSet, entity);
// build return result
@@ -376,12 +380,21 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getAndValidTopicNameInfo(req,
+ metaDataManager, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// add black list records
List<GroupProcessResult> retInfoList = new ArrayList<>();
- for (String groupName : batchGroupNames) {
- retInfoList.add(metaDataManager.enOrDisConsumeCtrlConf(opEntity, groupName,
- Boolean.FALSE, "Old API Set", sBuffer, result));
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(opEntity, groupName,
+ topicName, Boolean.FALSE, "Old API Set", null, null, sBuffer, result));
+ }
}
return buildRetInfo(retInfoList, sBuffer);
}
@@ -408,16 +421,16 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
BaseEntity opEntity = (BaseEntity) result.getRetData();
// check and get groupNameJsonSet info
- if (!getGroupJsonSetInfo(req, opEntity, sBuffer, result)) {
+ if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.FALSE, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Map<String, GroupResCtrlEntity> addRecordMap =
- (Map<String, GroupResCtrlEntity>) result.getRetData();
+ Map<String, GroupConsumeCtrlEntity> addRecordMap =
+ (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
// add or update and buid result
List<GroupProcessResult> retInfoList = new ArrayList<>();
- for (GroupResCtrlEntity entry : addRecordMap.values()) {
- retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(entry, sBuffer, result));
+ for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(entry, sBuffer, result));
}
return buildRetInfo(retInfoList, sBuffer);
}
@@ -449,1014 +462,365 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- // add disable black list records
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // add allowed consume records
List<GroupProcessResult> retInfoList = new ArrayList<>();
- for (String groupName : batchGroupNames) {
- retInfoList.add(metaDataManager.enOrDisConsumeCtrlConf(opEntity, groupName,
- Boolean.TRUE, "Old API Set", sBuffer, result));
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(opEntity, groupName,
+ topicName, Boolean.TRUE, "enable consume", null, null, sBuffer, result));
+
+ }
}
return buildRetInfo(retInfoList, sBuffer);
}
-
/**
- * Add group filter condition info
+ * Add authorized consumer group info
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminAddGroupFilterCondInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- String topicName =
- WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- true, "");
- Set<String> configuredTopicSet =
- brokerConfManager.getTotalConfiguredTopicNames();
- if (!configuredTopicSet.contains(topicName)) {
- throw new Exception(sBuilder.append("Topic: ").append(topicName)
- .append(" not configure in master's topic configure, please configure first!").toString());
- }
- final int filterCondStatus =
- WebParameterUtils.validIntDataParameter("condStatus",
- req.getParameter("condStatus"),
- false, 0, 0);
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- final String strNewFilterConds =
- WebParameterUtils.checkAndGetFilterConds(req.getParameter("filterConds"), true, sBuilder);
- BdbTopicAuthControlEntity topicAuthControlEntity =
- brokerConfManager.getBdbEnableAuthControlByTopicName(topicName);
- if (topicAuthControlEntity == null) {
- try {
- brokerConfManager.confSetBdbTopicAuthControl(
- new BdbTopicAuthControlEntity(topicName,
- false, createUser, createDate));
- } catch (Exception ee) {
- //
- }
- }
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity();
- webConsumerGroupEntity.setGroupTopicName(topicName);
- webConsumerGroupEntity.setConsumerGroupName(groupName);
- List<BdbConsumerGroupEntity> resultEntities =
- brokerConfManager.confGetBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- if (resultEntities.isEmpty()) {
- try {
- brokerConfManager.confAddAllowedConsumerGroup(
- new BdbConsumerGroupEntity(topicName,
- groupName, createUser, createDate));
- } catch (Throwable e2) {
- //
- }
+ public StringBuilder adminAddConsumerGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getAndValidTopicNameInfo(req,
+ metaDataManager, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ // add allowed consume records
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(opEntity, groupName,
+ topicName, Boolean.TRUE, "enable consume", null, null, sBuffer, result));
}
- brokerConfManager.confAddNewGroupFilterCond(
- new BdbGroupFilterCondEntity(topicName, groupName,
- filterCondStatus, strNewFilterConds, createUser, createDate));
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ }
+ return buildRetInfo(retInfoList, sBuffer);
}
/**
- * Add group filter info in batch
+ * Add authorized consumer group info in batch
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminBatchAddGroupFilterCondInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- List<Map<String, String>> filterJsonArray =
- WebParameterUtils.checkAndGetJsonArray("filterCondJsonSet",
- req.getParameter("filterCondJsonSet"),
- TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((filterJsonArray == null) || (filterJsonArray.isEmpty())) {
- throw new Exception("Null value of filterCondJsonSet, please set the value first!");
- }
- Set<String> configuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
- HashMap<String, BdbGroupFilterCondEntity> inGroupFilterCondEntityMap =
- new HashMap<>();
- for (int j = 0; j < filterJsonArray.size(); j++) {
- Map<String, String> groupObject = filterJsonArray.get(j);
- try {
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- groupObject.get("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String groupTopicName =
- WebParameterUtils.validStringParameter("topicName",
- groupObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- true, "");
- if (!configuredTopicSet.contains(groupTopicName)) {
- throw new Exception(sBuilder.append("Topic ").append(groupTopicName)
- .append(" not configure in master configure, please configure first!").toString());
- }
- int filterCondStatus =
- WebParameterUtils.validIntDataParameter("condStatus",
- groupObject.get("condStatus"),
- false, 0, 0);
- String strNewFilterConds =
- WebParameterUtils.checkAndGetFilterConds(
- (String) groupObject.get("filterConds"),
- true, sBuilder);
- String recordKey = sBuilder.append(groupName)
- .append("-").append(groupTopicName).toString();
- sBuilder.delete(0, sBuilder.length());
- inGroupFilterCondEntityMap.put(recordKey,
- new BdbGroupFilterCondEntity(groupTopicName, groupName,
- filterCondStatus, strNewFilterConds,
- createUser, createDate));
- } catch (Exception ee) {
- sBuilder.delete(0, sBuilder.length());
- throw new Exception(sBuilder.append("Process data exception, data is :")
- .append(groupObject.toString()).append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
- }
- if (inGroupFilterCondEntityMap.isEmpty()) {
- throw new Exception("Not found record in filterCondJsonSet parameter");
- }
- for (BdbGroupFilterCondEntity entity : inGroupFilterCondEntityMap.values()) {
- BdbTopicAuthControlEntity topicAuthControlEntity =
- brokerConfManager.getBdbEnableAuthControlByTopicName(entity.getTopicName());
- if (topicAuthControlEntity == null) {
- try {
- brokerConfManager.confSetBdbTopicAuthControl(
- new BdbTopicAuthControlEntity(entity.getTopicName(),
- false, createUser, createDate));
- } catch (Exception ee) {
- //
- }
- }
- BdbConsumerGroupEntity groupEntity =
- new BdbConsumerGroupEntity();
- groupEntity.setGroupTopicName(entity.getTopicName());
- groupEntity.setConsumerGroupName(entity.getConsumerGroupName());
- List<BdbConsumerGroupEntity> webConsumerGroupEntities =
- brokerConfManager.confGetBdbAllowedConsumerGroupSet(groupEntity);
- if (webConsumerGroupEntities.isEmpty()) {
- try {
- brokerConfManager.confAddAllowedConsumerGroup(
- new BdbConsumerGroupEntity(entity.getTopicName(),
- entity.getConsumerGroupName(), createUser, createDate));
- } catch (Throwable e2) {
- //
- }
- }
- brokerConfManager.confAddNewGroupFilterCond(entity);
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
+ public StringBuilder adminBatchAddConsumerGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- return sBuilder;
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get groupNameJsonSet info
+ if (!getGroupCsmJsonSetInfo(req, opEntity, Boolean.TRUE, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Map<String, GroupConsumeCtrlEntity> addRecordMap =
+ (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
+ // add or update and buid result
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(entry, sBuffer, result));
+ }
+ return buildRetInfo(retInfoList, sBuffer);
}
/**
- * Modify group filter condition info
+ * Delete allowed(authorized) consumer group info
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminModGroupFilterCondInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- String topicName =
- WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- true, "");
- Set<String> configuredTopicSet =
- brokerConfManager.getTotalConfiguredTopicNames();
- if (!configuredTopicSet.contains(topicName)) {
- throw new Exception(sBuilder.append("Topic: ").append(topicName)
- .append(" not configure in master's topic configure, please configure first!").toString());
- }
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- BdbGroupFilterCondEntity curFilterCondEntity =
- brokerConfManager.getBdbAllowedGroupFilterConds(topicName, groupName);
- if (curFilterCondEntity == null) {
- throw new Exception(sBuilder
- .append("Not found group filter condition configure record by topicName=")
- .append(topicName).append(", groupName=")
- .append(groupName).toString());
- }
- boolean foundChange = false;
- BdbGroupFilterCondEntity newFilterCondEntity =
- new BdbGroupFilterCondEntity(curFilterCondEntity.getTopicName(),
- curFilterCondEntity.getConsumerGroupName(),
- curFilterCondEntity.getControlStatus(),
- curFilterCondEntity.getFilterCondStr(),
- modifyUser, modifyDate);
- int filterCondStatus =
- WebParameterUtils.validIntDataParameter("condStatus",
- req.getParameter("condStatus"),
- false,
- TBaseConstants.META_VALUE_UNDEFINED,
- 0);
- if (filterCondStatus != TBaseConstants.META_VALUE_UNDEFINED
- && filterCondStatus != curFilterCondEntity.getControlStatus()) {
- foundChange = true;
- newFilterCondEntity.setControlStatus(filterCondStatus);
- }
- String strNewFilterConds =
- WebParameterUtils.checkAndGetFilterConds(req.getParameter("filterConds"), false, sBuilder);
- if (TStringUtils.isNotBlank(strNewFilterConds)) {
- if (!curFilterCondEntity.getFilterCondStr().equals(strNewFilterConds)) {
- foundChange = true;
- newFilterCondEntity.setFilterCondStr(strNewFilterConds);
- }
- }
- if (foundChange) {
- try {
- brokerConfManager.confModGroupFilterCondConfig(newFilterCondEntity);
- } catch (Throwable ee) {
- //
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
+ public StringBuilder adminDeleteConsumerGroupInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- return sBuilder;
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ List<GroupProcessResult> retInfoList =
+ metaDataManager.delGroupConsumeCtrlConf(opEntity.getModifyUser(),
+ groupNameSet, topicNameSet, sBuffer, result);
+ return buildRetInfo(retInfoList, sBuffer);
}
/**
- * Modify group filter condition info in batch
+ * Add group filter condition info
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminBatchModGroupFilterCondInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- List<Map<String, String>> jsonArray =
- WebParameterUtils.checkAndGetJsonArray("filterCondJsonSet",
- req.getParameter("filterCondJsonSet"),
- TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((jsonArray == null) || (jsonArray.isEmpty())) {
- throw new Exception("Null value of filterCondJsonSet, please set the value first!");
- }
- Set<String> batchRecords = new HashSet<>();
- List<BdbGroupFilterCondEntity> modifyFilterCondEntities = new ArrayList<>();
- for (int j = 0; j < jsonArray.size(); j++) {
- Map<String, String> groupObject = jsonArray.get(j);
- try {
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- groupObject.get("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String topicName =
- WebParameterUtils.validStringParameter("topicName",
- groupObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- true, "");
- BdbGroupFilterCondEntity curFilterCondEntity =
- brokerConfManager.getBdbAllowedGroupFilterConds(topicName, groupName);
- if (curFilterCondEntity == null) {
- throw new Exception(sBuilder
- .append("Not found group filter condition configure record by topicName=")
- .append(topicName)
- .append(", groupName=")
- .append(groupName).toString());
- }
- String recordKey = sBuilder.append(groupName)
- .append("-").append(topicName).toString();
- sBuilder.delete(0, sBuilder.length());
- if (batchRecords.contains(recordKey)) {
- continue;
- }
- boolean foundChange = false;
- BdbGroupFilterCondEntity newFilterCondEntity =
- new BdbGroupFilterCondEntity(curFilterCondEntity.getTopicName(),
- curFilterCondEntity.getConsumerGroupName(),
- curFilterCondEntity.getControlStatus(),
- curFilterCondEntity.getFilterCondStr(),
- modifyUser, modifyDate);
- int filterCondStatus =
- WebParameterUtils.validIntDataParameter("condStatus",
- groupObject.get("condStatus"),
- false, TBaseConstants.META_VALUE_UNDEFINED,
- 0);
- if (filterCondStatus != TBaseConstants.META_VALUE_UNDEFINED
- && filterCondStatus != curFilterCondEntity.getControlStatus()) {
- foundChange = true;
- newFilterCondEntity.setControlStatus(filterCondStatus);
- }
- String strNewFilterConds =
- WebParameterUtils.checkAndGetFilterConds(
- (String) groupObject.get("filterConds"),
- false, sBuilder);
- if (TStringUtils.isNotBlank(strNewFilterConds)) {
- if (!curFilterCondEntity.getFilterCondStr().equals(strNewFilterConds)) {
- foundChange = true;
- newFilterCondEntity.setFilterCondStr(strNewFilterConds);
- }
- }
- if (!foundChange) {
- continue;
- }
- batchRecords.add(recordKey);
- modifyFilterCondEntities.add(newFilterCondEntity);
- } catch (Exception ee) {
- sBuilder.delete(0, sBuilder.length());
- throw new Exception(sBuilder.append("Process data exception, data is :")
- .append(groupObject.toString())
- .append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
- }
- for (BdbGroupFilterCondEntity tmpFilterCondEntity : modifyFilterCondEntities) {
- try {
- brokerConfManager.confModGroupFilterCondConfig(tmpFilterCondEntity);
- } catch (Throwable ee) {
- //
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminAddGroupFilterCondInfo(HttpServletRequest req) {
+ return innAddOrModGroupFilterCondInfo(req, true);
}
/**
- * Delete group filter condition info
+ * Modify group filter condition info
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminDeleteGroupFilterCondInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, false, null, sBuilder);
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- false, false, null, sBuilder);
- if (batchOpGroupNames.isEmpty()) {
- for (String tmpTopicName : batchOpTopicNames) {
- BdbGroupFilterCondEntity webFilterCondEntity =
- new BdbGroupFilterCondEntity();
- webFilterCondEntity.setTopicName(tmpTopicName);
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- if (!webFilterCondEntities.isEmpty()) {
- webFilterCondEntity.setCreateUser("System");
- brokerConfManager.confDelBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- }
- }
- } else {
- for (String tmpTopicName : batchOpTopicNames) {
- for (String tmpGroupName : batchOpGroupNames) {
- BdbGroupFilterCondEntity webFilterCondEntity =
- new BdbGroupFilterCondEntity();
- webFilterCondEntity.setTopicName(tmpTopicName);
- webFilterCondEntity.setConsumerGroupName(tmpGroupName);
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- if (!webFilterCondEntities.isEmpty()) {
- webFilterCondEntity.setCreateUser("System");
- brokerConfManager.confDelBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- }
- }
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminModGroupFilterCondInfo(HttpServletRequest req) {
+ return innAddOrModGroupFilterCondInfo(req, false);
}
/**
- * Re-balance group allocation info
+ * Add group filter info in batch
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminRebalanceGroupAllocateInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- req.getParameter("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- int reJoinWait =
- WebParameterUtils.validIntDataParameter("reJoinWait",
- req.getParameter("reJoinWait"),
- false, 0, 0);
- Set<String> batchOpConsumerIds = new HashSet<>();
- String inputConsumerId = req.getParameter("consumerId");
- if (TStringUtils.isNotBlank(inputConsumerId)) {
- inputConsumerId = inputConsumerId.trim();
- String[] strInputConsumerIds =
- inputConsumerId.split(TokenConstants.ARRAY_SEP);
- for (int i = 0; i < strInputConsumerIds.length; i++) {
- if (TStringUtils.isBlank(strInputConsumerIds[i])) {
- continue;
- }
- String consumerId = strInputConsumerIds[i].trim();
- if (consumerId.length() > TServerConstants.CFG_CONSUMER_CLIENTID_MAX_LENGTH) {
- throw new Exception(sBuilder.append("The max length of ")
- .append(consumerId)
- .append(" in consumerId parameter over ")
- .append(TServerConstants.CFG_CONSUMER_CLIENTID_MAX_LENGTH)
- .append(" characters").toString());
- }
- if (!consumerId.matches(TBaseConstants.META_TMP_CONSUMERID_VALUE)) {
- throw new Exception(sBuilder.append("The value of ").append(consumerId)
- .append("in consumerId parameter must begin with a letter, " +
- "can only contain characters,numbers,dot,scores,and underscores").toString());
- }
- if (!batchOpConsumerIds.contains(consumerId)) {
- batchOpConsumerIds.add(consumerId);
- }
- }
- }
- if (batchOpConsumerIds.isEmpty()) {
- throw new Exception("Null value of required consumerId parameter");
- }
- ConsumerInfoHolder consumerInfoHolder =
- master.getConsumerHolder();
- ConsumerBandInfo consumerBandInfo =
- consumerInfoHolder.getConsumerBandInfo(groupName);
- if (consumerBandInfo == null) {
- return sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"The group(")
- .append(groupName).append(") not online! \"}");
- } else {
- Map<String, NodeRebInfo> nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
- for (String consumerId : batchOpConsumerIds) {
- if (nodeRebInfoMap.containsKey(consumerId)) {
- return sBuilder
- .append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Duplicated set for consumerId(")
- .append(consumerId).append(") in group(")
- .append(groupName).append(")! \"}");
- }
- }
- logger.info(sBuilder.append("[Re-balance] Add rebalance consumer: group=")
- .append(groupName).append(", consumerIds=")
- .append(batchOpConsumerIds.toString())
- .append(", reJoinWait=").append(reJoinWait)
- .append(", creator=").append(modifyUser).toString());
- sBuilder.delete(0, sBuilder.length());
- consumerInfoHolder.addRebConsumerInfo(groupName, batchOpConsumerIds, reJoinWait);
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- }
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminBatchAddGroupFilterCondInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdGroupFilterCondInfo(req, true);
}
-
/**
- * Add authorized consumer group info
+ * Modify group filter condition info in batch
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminAddConsumerGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- Set<String> configuredTopicSet =
- brokerConfManager.getTotalConfiguredTopicNames();
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, true, configuredTopicSet, sBuilder);
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- true, false, null, sBuilder);
- for (String tmpTopicName : batchOpTopicNames) {
- BdbTopicAuthControlEntity topicAuthControlEntity =
- brokerConfManager.getBdbEnableAuthControlByTopicName(tmpTopicName);
- if (topicAuthControlEntity == null) {
- try {
- brokerConfManager.confSetBdbTopicAuthControl(
- new BdbTopicAuthControlEntity(tmpTopicName,
- false, createUser, createDate));
- } catch (Exception ee) {
- //
- }
- }
- for (String tmpGroupName : batchOpGroupNames) {
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity(tmpTopicName,
- tmpGroupName, createUser, createDate);
- brokerConfManager.confAddAllowedConsumerGroup(webConsumerGroupEntity);
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminBatchModGroupFilterCondInfo(HttpServletRequest req) {
+ return innBatchAddOrUpdGroupFilterCondInfo(req, false);
}
/**
- * Add authorized consumer group info in batch
+ * Delete group filter condition info
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminBatchAddConsumerGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- List<Map<String, String>> jsonArray =
- WebParameterUtils.checkAndGetJsonArray("groupNameJsonSet",
- req.getParameter("groupNameJsonSet"),
- TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((jsonArray == null) || (jsonArray.isEmpty())) {
- throw new Exception("Null value of groupNameJsonSet, please set the value first!");
- }
- Set<String> configuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
- HashMap<String, BdbConsumerGroupEntity> inGroupAuthConfEntityMap =
- new HashMap<>();
- for (int j = 0; j < jsonArray.size(); j++) {
- Map<String, String> groupObject = jsonArray.get(j);
- try {
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- groupObject.get("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String groupTopicName =
- WebParameterUtils.validStringParameter("topicName",
- groupObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- true, "");
- String groupCreateUser =
- WebParameterUtils.validStringParameter("createUser",
- groupObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null);
- Date groupCreateDate =
- WebParameterUtils.validDateParameter("createDate",
- groupObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, null);
- if ((TStringUtils.isBlank(groupCreateUser))
- || (groupCreateDate == null)) {
- groupCreateUser = createUser;
- groupCreateDate = createDate;
- }
- if (!configuredTopicSet.contains(groupTopicName)) {
- throw new Exception(sBuilder.append("Topic ").append(groupTopicName)
- .append(" not configure in master configure, please configure first!").toString());
- }
- String recordKey = sBuilder.append(groupName)
- .append("-")
- .append(groupTopicName).toString();
- sBuilder.delete(0, sBuilder.length());
- inGroupAuthConfEntityMap.put(recordKey,
- new BdbConsumerGroupEntity(groupTopicName,
- groupName, groupCreateUser, groupCreateDate));
- } catch (Exception ee) {
- sBuilder.delete(0, sBuilder.length());
- throw new Exception(sBuilder.append("Process data exception, data is :")
- .append(groupObject.toString()).append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
-
- }
- if (inGroupAuthConfEntityMap.isEmpty()) {
- throw new Exception("Not found record in groupNameJsonSet parameter");
- }
- for (BdbConsumerGroupEntity tmpGroupEntity : inGroupAuthConfEntityMap.values()) {
- BdbTopicAuthControlEntity topicAuthControlEntity =
- brokerConfManager.getBdbEnableAuthControlByTopicName(tmpGroupEntity.getGroupTopicName());
- if (topicAuthControlEntity == null) {
- try {
- brokerConfManager.confSetBdbTopicAuthControl(
- new BdbTopicAuthControlEntity(tmpGroupEntity.getGroupTopicName(),
- false, createUser, createDate));
- } catch (Exception ee) {
- //
- }
- }
- brokerConfManager.confAddAllowedConsumerGroup(tmpGroupEntity);
+ public StringBuilder adminDeleteGroupFilterCondInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(opEntity,
+ groupName, topicName, Boolean.TRUE, "enable consume",
+ false, TServerConstants.BLANK_FILTER_ITEM_STR, sBuffer, result));
}
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
}
- return sBuilder;
+ return buildRetInfo(retInfoList, sBuffer);
}
-
/**
- * Delete allowed(authorized) consumer group info
+ * Re-balance group allocation info
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminDeleteConsumerGroupInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, false, null, sBuilder);
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- false, false, null, sBuilder);
- if (batchOpGroupNames.isEmpty()) {
- for (String tmpTopicName : batchOpTopicNames) {
- BdbGroupFilterCondEntity webFilterCondEntity =
- new BdbGroupFilterCondEntity();
- webFilterCondEntity.setTopicName(tmpTopicName);
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- if (!webFilterCondEntities.isEmpty()) {
- webFilterCondEntity.setCreateUser("System");
- brokerConfManager.confDelBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- }
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity();
- webConsumerGroupEntity.setGroupTopicName(tmpTopicName);
- brokerConfManager.confDelBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- }
- } else {
- for (String tmpTopicName : batchOpTopicNames) {
- for (String tmpGroupName : batchOpGroupNames) {
- BdbGroupFilterCondEntity webFilterCondEntity =
- new BdbGroupFilterCondEntity();
- webFilterCondEntity.setTopicName(tmpTopicName);
- webFilterCondEntity.setConsumerGroupName(tmpGroupName);
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- if (!webFilterCondEntities.isEmpty()) {
- webFilterCondEntity.setCreateUser("System");
- brokerConfManager.confDelBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- }
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity();
- webConsumerGroupEntity.setGroupTopicName(tmpTopicName);
- webConsumerGroupEntity.setConsumerGroupName(tmpGroupName);
- brokerConfManager.confDelBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- }
- }
+ public StringBuilder adminRebalanceGroupAllocateInfo(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group configure info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ String groupName = (String) result.getRetData();
+ // get reJoinWait info
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.REJOINWAIT, false, 0, 0, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int reJoinWait = (int) result.getRetData();
+ // get consumerId list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSCONSUMERID, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrInfo());
+ return sBuffer;
+ }
+ Set<String> consumerIdSet = (Set<String>) result.getRetData();
+ ConsumerInfoHolder consumerInfoHolder =
+ master.getConsumerHolder();
+ ConsumerBandInfo consumerBandInfo =
+ consumerInfoHolder.getConsumerBandInfo(groupName);
+ if (consumerBandInfo == null) {
+ String errInfo = sBuffer.append("The group(")
+ .append(groupName).append(") not online!").toString();
+ sBuffer.delete(0, sBuffer.length());
+ WebParameterUtils.buildFailResult(sBuffer, errInfo);
+ return sBuffer;
+ }
+ Map<String, NodeRebInfo> nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
+ for (String consumerId : consumerIdSet) {
+ if (nodeRebInfoMap.containsKey(consumerId)) {
+ String errInfo = sBuffer.append("Duplicated set for consumerId(")
+ .append(consumerId).append(") in group(")
+ .append(groupName).append(")! \"}").toString();
+ sBuffer.delete(0, sBuffer.length());
+ WebParameterUtils.buildFailResult(sBuffer, errInfo);
+ return sBuffer;
}
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
}
- return sBuilder;
+ logger.info(sBuffer.append("[Re-balance] Add rebalance consumer: group=")
+ .append(groupName).append(", consumerIds=")
+ .append(consumerIdSet.toString())
+ .append(", reJoinWait=").append(reJoinWait)
+ .append(", creator=").append(opEntity.getModifyUser()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ consumerInfoHolder.addRebConsumerInfo(groupName, consumerIdSet, reJoinWait);
+ WebParameterUtils.buildSuccessResult(sBuffer);
+ return sBuffer;
}
-
-
-
-
-
-
/**
* Add consumer group setting
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminAddConsumeGroupSettingInfo(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- int enableBind =
- WebParameterUtils.validIntDataParameter("enableBind",
- req.getParameter("enableBind"),
- false, 0, 0);
- int allowedBClientRate =
- WebParameterUtils.validIntDataParameter("allowedBClientRate",
- req.getParameter("allowedBClientRate"),
- false, 0, 0);
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- true, false, null, sBuilder);
- for (String tmpGroupName : batchOpGroupNames) {
- BdbConsumeGroupSettingEntity webConsumeGroupSettingEntity =
- new BdbConsumeGroupSettingEntity(tmpGroupName,
- enableBind, allowedBClientRate, "", createUser, createDate);
- brokerConfManager.confAddBdbConsumeGroupSetting(webConsumeGroupSettingEntity);
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminAddConsumeGroupSettingInfo(HttpServletRequest req) {
+ return innAddOrUpdConsumeGroupSettingInfo(req, true);
}
/**
- * Add consumer group setting in batch
+ * Update consumer group setting
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminBatchAddConsumeGroupSetting(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- int enableBind =
- WebParameterUtils.validIntDataParameter("enableBind",
- req.getParameter("enableBind"),
- false, 0, 0);
- int allowedBClientRate =
- WebParameterUtils.validIntDataParameter("allowedBClientRate",
- req.getParameter("allowedBClientRate"),
- false, 0, 0);
- List<Map<String, String>> groupNameJsonArray =
- WebParameterUtils.checkAndGetJsonArray("groupNameJsonSet",
- req.getParameter("groupNameJsonSet"),
- TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((groupNameJsonArray == null) || (groupNameJsonArray.isEmpty())) {
- throw new Exception("Null value of groupNameJsonSet, please set the value first!");
- }
- HashMap<String, BdbConsumeGroupSettingEntity> inOffsetRstGroupEntityMap =
- new HashMap<>();
- for (int j = 0; j < groupNameJsonArray.size(); j++) {
- Map<String, String> groupObject = groupNameJsonArray.get(j);
- try {
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- groupObject.get("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String groupCreateUser =
- WebParameterUtils.validStringParameter("createUser",
- groupObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, createUser);
- Date groupCreateDate =
- WebParameterUtils.validDateParameter("createDate",
- groupObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, createDate);
- int groupEnableBind =
- WebParameterUtils.validIntDataParameter("enableBind",
- groupObject.get("enableBind"),
- false, enableBind, 0);
- int groupAllowedBClientRate =
- WebParameterUtils.validIntDataParameter("allowedBClientRate",
- groupObject.get("allowedBClientRate"),
- false, allowedBClientRate, 0);
- inOffsetRstGroupEntityMap.put(groupName,
- new BdbConsumeGroupSettingEntity(groupName,
- groupEnableBind, groupAllowedBClientRate,
- "", groupCreateUser, groupCreateDate));
- } catch (Exception ee) {
- throw new Exception(sBuilder.append("Process data exception, data is :")
- .append(groupObject.toString())
- .append(", exception is : ")
- .append(ee.getMessage()).toString());
- }
- }
- if (inOffsetRstGroupEntityMap.isEmpty()) {
- throw new Exception("Not found record in groupNameJsonSet parameter");
- }
- for (BdbConsumeGroupSettingEntity tmpGroupEntity
- : inOffsetRstGroupEntityMap.values()) {
- brokerConfManager.confAddBdbConsumeGroupSetting(tmpGroupEntity);
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminUpdConsumeGroupSetting(HttpServletRequest req) {
+ return innAddOrUpdConsumeGroupSettingInfo(req, false);
}
-
/**
- * Update consumer group setting
+ * Add consumer group setting in batch
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminUpdConsumeGroupSetting(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String modifyUser =
- WebParameterUtils.validStringParameter("modifyUser",
- req.getParameter("modifyUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date modifyDate =
- WebParameterUtils.validDateParameter("modifyDate",
- req.getParameter("modifyDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- int enableBind =
- WebParameterUtils.validIntDataParameter("enableBind",
- req.getParameter("enableBind"),
- false, -2, 0);
- int allowedBClientRate =
- WebParameterUtils.validIntDataParameter("allowedBClientRate",
- req.getParameter("allowedBClientRate"),
- false, -2, 0);
- if (enableBind == -2
- && allowedBClientRate == -2) {
- throw new Exception("Not require update content in request parameter!");
- }
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- true, false, null, sBuilder);
- for (String tmpGroupName : batchOpGroupNames) {
- try {
- boolean isChanged = false;
- BdbConsumeGroupSettingEntity oldEntity =
- brokerConfManager.getBdbConsumeGroupSetting(tmpGroupName);
- if (oldEntity == null) {
- continue;
- }
- BdbConsumeGroupSettingEntity newEntity =
- new BdbConsumeGroupSettingEntity(oldEntity);
- if (enableBind != -2) {
- if (newEntity.getEnableBind() != enableBind) {
- isChanged = true;
- newEntity.setEnableBind(enableBind);
- }
- }
- if (allowedBClientRate != -2) {
- if (allowedBClientRate != newEntity.getAllowedBrokerClientRate()) {
- isChanged = true;
- newEntity.setAllowedBrokerClientRate(allowedBClientRate);
- }
- }
- if (isChanged) {
- brokerConfManager.confUpdBdbConsumeGroupSetting(newEntity);
- }
- } catch (Throwable e) {
- //
- }
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
+ public StringBuilder adminBatchAddConsumeGroupSetting(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get groupNameJsonSet info
+ if (!getGroupCtrlJsonSetInfo(req, opEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- return sBuilder;
+ Map<String, GroupResCtrlEntity> addRecordMap =
+ (Map<String, GroupResCtrlEntity>) result.getRetData();
+ // add or update and build result
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (GroupResCtrlEntity resCtrlEntity : addRecordMap.values()) {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(
+ resCtrlEntity, sBuffer, result));
+ }
+ return buildRetInfo(retInfoList, sBuffer);
}
/**
@@ -1464,27 +828,40 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminDeleteConsumeGroupSetting(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- Set<String> batchOpGroupNames =
- WebParameterUtils.getBatchGroupNames(req.getParameter("groupName"),
- true, false, null, sBuilder);
- brokerConfManager.confDeleteBdbConsumeGroupSetting(batchOpGroupNames, sBuilder);
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
- }
- return sBuilder;
+ public StringBuilder adminDeleteConsumeGroupSetting(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // add or update group control record
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : groupNameSet) {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(false, opEntity,
+ groupName, Boolean.FALSE, 0,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TBaseConstants.META_VALUE_UNDEFINED, null, sBuffer, result));
+ }
+ return buildRetInfo(retInfoList, sBuffer);
}
-
private StringBuilder buildRetInfo(List<GroupProcessResult> retInfo,
StringBuilder sBuffer) {
int totalCnt = 0;
@@ -1502,17 +879,261 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
- private boolean getGroupJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
- StringBuilder sBuffer, ProcessResult result) {
+ /**
+ * Inner method: add consumer group setting
+ *
+ * @param req
+ * @return
+ */
+ private StringBuilder innAddOrUpdConsumeGroupSettingInfo(HttpServletRequest req,
+ boolean isAddOp) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // get resCheckStatus info
+ if (!WebParameterUtils.getBooleanParamValue(req, WebFieldDef.RESCHECKENABLE,
+ false, (isAddOp ? false : null), sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Boolean resChkEnable = (Boolean) result.getRetData();
+ // get and valid allowedBClientRate info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.OLDALWDBCRATE,
+ false, (isAddOp ? TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN
+ : TBaseConstants.META_VALUE_UNDEFINED),
+ TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int allowedBClientRate = (int) result.getRetData();
+ // add or update group control record
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (String groupName : groupNameSet) {
+ if (isAddOp) {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(opEntity,
+ groupName, resChkEnable, allowedBClientRate, sBuffer, result));
+ } else {
+ retInfoList.add(metaDataManager.addOrUpdGroupResCtrlConf(isAddOp, opEntity,
+ groupName, resChkEnable, allowedBClientRate,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TBaseConstants.META_VALUE_UNDEFINED, null, sBuffer, result));
+ }
+ }
+ return buildRetInfo(retInfoList, sBuffer);
+ }
+
+ /**
+ * Inner method: modify group filter condition info
+ *
+ * @param req
+ * @return
+ */
+ private StringBuilder innAddOrModGroupFilterCondInfo(HttpServletRequest req,
+ boolean isAddOp) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getAndValidTopicNameInfo(req,
+ metaDataManager, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // check and get condStatus field
+ if (!getCondStatusParamValue(req, false, (isAddOp ? false : null), sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Boolean filterEnable = (Boolean) result.getRetData();
+ // get filterConds info
+ if (!WebParameterUtils.getFilterCondString(req, false, isAddOp, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ String filterCondStr = (String) result.getRetData();
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ // modify filter consume records
+ for (String groupName : groupNameSet) {
+ for (String topicName : topicNameSet) {
+ if (isAddOp) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(opEntity,
+ groupName, topicName, Boolean.TRUE, "enable consume",
+ filterEnable, filterCondStr, sBuffer, result));
+ } else {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(isAddOp,
+ opEntity, groupName, topicName, Boolean.TRUE, "enable consume",
+ filterEnable, filterCondStr, sBuffer, result));
+ }
+ }
+ }
+ return buildRetInfo(retInfoList, sBuffer);
+ }
+
+ /**
+ * Inner method: add group filter info in batch
+ *
+ * @param req
+ * @return
+ */
+ private StringBuilder innBatchAddOrUpdGroupFilterCondInfo(HttpServletRequest req,
+ boolean isAddOp) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get filterCondJsonSet info
+ if (!getFilterJsonSetInfo(req, isAddOp, opEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Map<String, GroupConsumeCtrlEntity> addRecordMap =
+ (Map<String, GroupConsumeCtrlEntity>) result.getRetData();
+ // add or update and build result
+ List<GroupProcessResult> retInfoList = new ArrayList<>();
+ for (GroupConsumeCtrlEntity entry : addRecordMap.values()) {
+ if (isAddOp) {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(
+ entry, sBuffer, result));
+ } else {
+ retInfoList.add(metaDataManager.addOrUpdGroupConsumeCtrlInfo(
+ isAddOp, entry, sBuffer, result));
+ }
+ }
+ return buildRetInfo(retInfoList, sBuffer);
+ }
+
+ private boolean getFilterJsonSetInfo(HttpServletRequest req, boolean isAddOp,
+ BaseEntity defOpEntity, StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.FILTERJSONSET, true, null, result)) {
+ return result.isSuccess();
+ }
+ List<Map<String, String>> groupJsonArray =
+ (List<Map<String, String>>) result.getRetData();
+ GroupConsumeCtrlEntity itemEntity;
+ Map<String, String> itemValueMap;
+ Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
+ Set<String> configuredTopicSet =
+ metaDataManager.getTotalConfiguredTopicNames();
+ for (int j = 0; j < groupJsonArray.size(); j++) {
+ itemValueMap = groupJsonArray.get(j);
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
+ isAddOp, defOpEntity, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
+ // get group configure info
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
+ return result.isSuccess();
+ }
+ String groupName = (String) result.getRetData();
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.TOPICNAME, true, "", sBuffer, result)) {
+ return result.isSuccess();
+ }
+ String topicName = (String) result.getRetData();
+ if (!configuredTopicSet.contains(topicName)) {
+ result.setFailResult(sBuffer
+ .append(WebFieldDef.TOPICNAME.name)
+ .append(" ").append(topicName)
+ .append(" is not configure, please configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ // check and get condStatus field
+ if (!getCondStatusParamValue(req, false,
+ (isAddOp ? false : null), sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean filterEnable = (Boolean) result.getRetData();
+ // get filterConds info
+ if (!WebParameterUtils.getFilterCondString(req,
+ false, isAddOp, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ String filterCondStr = (String) result.getRetData();
+ itemEntity =
+ new GroupConsumeCtrlEntity(itemOpEntity, groupName, topicName);
+ itemEntity.updModifyInfo(itemOpEntity.getDataVerId(),
+ true, "enable consume", filterEnable, filterCondStr);
+ addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+ }
+ // check result
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuffer
+ .append("Not found record info in ")
+ .append(WebFieldDef.FILTERJSONSET.name)
+ .append(" parameter!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(addRecordMap);
+ return result.isSuccess();
+ }
+
+ private boolean getGroupCtrlJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
+ StringBuilder sBuffer, ProcessResult result) {
if (!WebParameterUtils.getJsonArrayParamValue(req,
WebFieldDef.GROUPJSONSET, true, null, result)) {
- return result.success;
+ return result.isSuccess();
}
List<Map<String, String>> groupJsonArray =
- (List<Map<String, String>>) result.retData1;
+ (List<Map<String, String>>) result.getRetData();
GroupResCtrlEntity itemEntity;
Map<String, String> itemValueMap;
Map<String, GroupResCtrlEntity> addRecordMap = new HashMap<>();
+ Set<String> configuredTopicSet =
+ metaDataManager.getTotalConfiguredTopicNames();
for (int j = 0; j < groupJsonArray.size(); j++) {
itemValueMap = groupJsonArray.get(j);
// check and get operation info
@@ -1524,15 +1145,87 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
// get group configure info
if (!WebParameterUtils.getStringParamValue(itemValueMap,
WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
- return result.success;
+ return result.isSuccess();
+ }
+ String groupName = (String) result.getRetData();
+ // get resCheckStatus info
+ if (!WebParameterUtils.getBooleanParamValue(itemValueMap, WebFieldDef.RESCHECKENABLE,
+ false, false, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean resChkEnable = (Boolean) result.getRetData();
+ // get and valid allowedBClientRate info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.OLDALWDBCRATE,
+ false, TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN,
+ TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN, sBuffer, result)) {
+ return result.isSuccess();
}
- String groupName = (String) result.retData1;
+ int allowedB2CRate = (int) result.getRetData();
itemEntity =
new GroupResCtrlEntity(itemOpEntity, groupName);
- itemEntity.updModifyInfo(itemEntity.getDataVerId(),
- Boolean.FALSE, "Old API batch set", null,
- TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
- null, TBaseConstants.META_VALUE_UNDEFINED, null);
+ itemEntity.updModifyInfo(itemOpEntity.getDataVerId(), resChkEnable, allowedB2CRate,
+ TBaseConstants.META_VALUE_UNDEFINED, null,
+ TBaseConstants.META_VALUE_UNDEFINED, null);
+ addRecordMap.put(itemEntity.getGroupName(), itemEntity);
+ }
+ // check result
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuffer
+ .append("Not found record info in ")
+ .append(WebFieldDef.GROUPJSONSET.name)
+ .append(" parameter!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(addRecordMap);
+ return result.isSuccess();
+ }
+
+ private boolean getGroupCsmJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
+ Boolean enableCsm, StringBuilder sBuffer,
+ ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.GROUPJSONSET, true, null, result)) {
+ return result.isSuccess();
+ }
+ List<Map<String, String>> groupJsonArray =
+ (List<Map<String, String>>) result.getRetData();
+ GroupConsumeCtrlEntity itemEntity;
+ Map<String, String> itemValueMap;
+ Map<String, GroupConsumeCtrlEntity> addRecordMap = new HashMap<>();
+ Set<String> configuredTopicSet =
+ metaDataManager.getTotalConfiguredTopicNames();
+ for (int j = 0; j < groupJsonArray.size(); j++) {
+ itemValueMap = groupJsonArray.get(j);
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(itemValueMap,
+ true, defOpEntity, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
+ // get group configure info
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
+ return result.isSuccess();
+ }
+ String groupName = (String) result.getRetData();
+ if (!WebParameterUtils.getStringParamValue(itemValueMap,
+ WebFieldDef.TOPICNAME, true, "", sBuffer, result)) {
+ return result.isSuccess();
+ }
+ String topicName = (String) result.getRetData();
+ if (!configuredTopicSet.contains(topicName)) {
+ result.setFailResult(sBuffer
+ .append(WebFieldDef.TOPICNAME.name)
+ .append(" ").append(topicName)
+ .append(" is not configure, please configure first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ itemEntity =
+ new GroupConsumeCtrlEntity(itemOpEntity, groupName, topicName);
+ itemEntity.updModifyInfo(itemOpEntity.getDataVerId(),
+ enableCsm, "Old API batch set", null, null);
addRecordMap.put(itemEntity.getGroupName(), itemEntity);
}
// check result
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
index 0c764d3..79823ab 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
@@ -17,26 +17,25 @@
package org.apache.tubemq.server.master.web.handler;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
+@Deprecated
public class WebAdminTopicAuthHandler extends AbstractWebHandler {
public WebAdminTopicAuthHandler(TMaster master) {
@@ -58,353 +57,274 @@ public class WebAdminTopicAuthHandler extends AbstractWebHandler {
}
/**
- * Enable or disable topic authorization control
+ * Query topic authorization control
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminEnableDisableTopicAuthControl(
- HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate",
- req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, new Date());
- boolean isEnable =
- WebParameterUtils.validBooleanDataParameter("isEnable",
- req.getParameter("isEnable"),
- false, false);
- Set<String> configuredTopicSet =
- brokerConfManager.getTotalConfiguredTopicNames();
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, true, configuredTopicSet, sBuilder);
- for (String topicName : batchOpTopicNames) {
- brokerConfManager.confSetBdbTopicAuthControl(
- new BdbTopicAuthControlEntity(topicName,
- isEnable, createUser, createDate));
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
+ public StringBuilder adminQueryTopicAuthControl(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ TopicCtrlEntity qryEntity = new TopicCtrlEntity();
+ // get queried operation info, for createUser, modifyUser, dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- return sBuilder;
- }
-
- /**
- * Add topic authorization control in batch
- *
- * @param req
- * @return
- * @throws Exception
- */
- public StringBuilder adminBatchAddTopicAuthControl(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String operator =
- WebParameterUtils.validStringParameter("createUser", req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH, true, "");
- Date createDate =
- WebParameterUtils.validDateParameter("createDate", req.getParameter("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- List<Map<String, String>> topicJsonArray =
- WebParameterUtils.checkAndGetJsonArray("topicJsonSet",
- req.getParameter("topicJsonSet"), TBaseConstants.META_VALUE_UNDEFINED, true);
- if ((topicJsonArray == null) || (topicJsonArray.isEmpty())) {
- throw new Exception("Null value of topicJsonSet, please set the value first!");
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // query matched records
+ Map<String, TopicCtrlEntity> topicCtrlMap =
+ metaDataManager.getTopicCtrlConf(topicNameSet, qryEntity);
+ // build query result
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (TopicCtrlEntity entity : topicCtrlMap.values()) {
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
}
- Set<String> configuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
- HashMap<String, BdbTopicAuthControlEntity> inTopicAuthConfEntityMap =
- new HashMap<>();
- HashMap<String, BdbConsumerGroupEntity> inGroupAuthConfEntityMap =
- new HashMap<>();
- for (int count = 0; count < topicJsonArray.size(); count++) {
- Map<String, String> jsonObject = topicJsonArray.get(count);
- try {
- String topicName =
- WebParameterUtils.validStringParameter("topicName", jsonObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
- boolean enableControl =
- WebParameterUtils.validBooleanDataParameter("isEnable",
- jsonObject.get("isEnable"),
- false, false);
- String itemCreateUser =
- WebParameterUtils.validStringParameter("createUser",
- jsonObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null);
- Date itemCreateDate =
- WebParameterUtils.validDateParameter("createDate",
- jsonObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, null);
- if ((TStringUtils.isBlank(itemCreateUser)) || (itemCreateDate == null)) {
- itemCreateUser = operator;
- itemCreateDate = createDate;
+ sBuffer.append("{\"topicName\":\"").append(entity.getTopicName())
+ .append("\",\"isEnable\":").append(entity.isAuthCtrlEnable())
+ .append(",\"createUser\":\"").append(entity.getCreateUser())
+ .append("\",\"createDate\":\"").append(entity.getCreateDateStr())
+ .append("\",\"authConsumeGroup\":[");
+ List<GroupConsumeCtrlEntity> groupEntity =
+ metaDataManager.getConsumeCtrlByTopic(entity.getTopicName());
+ int j = 0;
+ if (!groupEntity.isEmpty()) {
+ for (GroupConsumeCtrlEntity itemEntity : groupEntity) {
+ if (j++ > 0) {
+ sBuffer.append(",");
}
- if (!configuredTopicSet.contains(topicName)) {
- throw new Exception(sBuilder.append("Topic: ").append(topicName)
- .append(" not configure in master's topic configure, please configure first!")
- .toString());
- }
- inTopicAuthConfEntityMap.put(topicName, new BdbTopicAuthControlEntity(topicName,
- enableControl, itemCreateUser, itemCreateDate));
- inGroupAuthConfEntityMap =
- getAuthConsumeGroupInfo(topicName, operator,
- createDate, jsonObject, inGroupAuthConfEntityMap, sBuilder);
- } catch (Exception ee) {
- sBuilder.delete(0, sBuilder.length());
- throw new Exception(sBuilder.append("Process data exception, data is :")
- .append(jsonObject.toString()).append(", exception is : ")
- .append(ee.getMessage()).toString());
+ sBuffer.append("{\"topicName\":\"").append(itemEntity.getTopicName())
+ .append("\",\"groupName\":\"")
+ .append(itemEntity.getGroupName())
+ .append("\",\"createUser\":\"")
+ .append(itemEntity.getCreateUser())
+ .append("\",\"createDate\":\"")
+ .append(itemEntity.getCreateDateStr())
+ .append("\"}");
}
}
- if (inTopicAuthConfEntityMap.isEmpty()) {
- throw new Exception("Not found record in topicJsonSet parameter");
- }
- for (BdbTopicAuthControlEntity tmpTopicEntity : inTopicAuthConfEntityMap.values()) {
- brokerConfManager.confSetBdbTopicAuthControl(tmpTopicEntity);
- }
- for (BdbConsumerGroupEntity tmpGroupEntity : inGroupAuthConfEntityMap.values()) {
- brokerConfManager.confAddAllowedConsumerGroup(tmpGroupEntity);
+ sBuffer.append("],\"groupCount\":").append(j).append(",\"authFilterCondSet\":[");
+ int y = 0;
+ for (GroupConsumeCtrlEntity condEntity : groupEntity) {
+ if (y++ > 0) {
+ sBuffer.append(",");
+ }
+ int condStatusId = condEntity.isEnableFilterConsume() ? 2 : 0;
+ sBuffer.append("{\"topicName\":\"").append(condEntity.getTopicName())
+ .append("\",\"groupName\":\"").append(condEntity.getGroupName())
+ .append("\",\"condStatus\":").append(condStatusId);
+ if (condEntity.getFilterCondStr().length() <= 2) {
+ sBuffer.append(",\"filterConds\":\"\"");
+ } else {
+ sBuffer.append(",\"filterConds\":\"")
+ .append(condEntity.getFilterCondStr())
+ .append("\"");
+ }
+ sBuffer.append(",\"createUser\":\"").append(condEntity.getCreateUser())
+ .append("\",\"createDate\":\"").append(condEntity.getCreateDateStr())
+ .append("\"}");
}
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
+ sBuffer.append("],\"filterCount\":").append(y).append("}");
}
- return sBuilder;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
/**
- * Delete topic authorization control
+ * Enable or disable topic authorization control
*
* @param req
* @return
- * @throws Exception
*/
- public StringBuilder adminDeleteTopicAuthControl(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- try {
- WebParameterUtils.reqAuthorizeCheck(master, brokerConfManager,
- req.getParameter("confModAuthToken"));
- String createUser =
- WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- true, "");
- Set<String> batchOpTopicNames =
- WebParameterUtils.getBatchTopicNames(req.getParameter("topicName"),
- true, false, null, sBuilder);
- for (String tmpTopicName : batchOpTopicNames) {
- BdbGroupFilterCondEntity webFilterCondEntity =
- new BdbGroupFilterCondEntity();
- webFilterCondEntity.setTopicName(tmpTopicName);
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- if (!webFilterCondEntities.isEmpty()) {
- webFilterCondEntity.setCreateUser(createUser);
- brokerConfManager.confDelBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- }
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity();
- webConsumerGroupEntity.setGroupTopicName(tmpTopicName);
- List<BdbConsumerGroupEntity> webConsumerGroupEntities =
- brokerConfManager.confGetBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- if (!webConsumerGroupEntities.isEmpty()) {
- webConsumerGroupEntity.setRecordCreateUser(createUser);
- brokerConfManager.confDelBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- }
- BdbTopicAuthControlEntity webTopicAuthControlEntity =
- new BdbTopicAuthControlEntity();
- webTopicAuthControlEntity.setTopicName(tmpTopicName);
- webTopicAuthControlEntity.setCreateUser(createUser);
- brokerConfManager.confDeleteBdbTopicAuthControl(webTopicAuthControlEntity);
- }
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\"}");
+ public StringBuilder adminEnableDisableTopicAuthControl(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // get authCtrlStatus info
+ if (!WebParameterUtils.getBooleanParamValue(req, WebFieldDef.ISENABLE,
+ false, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- return sBuilder;
+ Boolean enableTopicAuth = (Boolean) result.getRetData();
+ // add or update records
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (String topicName : topicNameSet) {
+ retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(opEntity,
+ topicName, enableTopicAuth, sBuffer, result));
+ }
+ return buildRetInfo(retInfo, sBuffer);
}
/**
- * Query topic authorization control
+ * Add topic authorization control in batch
*
* @param req
* @return
* @throws Exception
*/
- public StringBuilder adminQueryTopicAuthControl(HttpServletRequest req) throws Exception {
- StringBuilder sBuilder = new StringBuilder(512);
- BdbTopicAuthControlEntity queryEntity =
- new BdbTopicAuthControlEntity();
- try {
- queryEntity
- .setTopicName(WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- false, null));
- queryEntity
- .setCreateUser(WebParameterUtils.validStringParameter("createUser",
- req.getParameter("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null));
- List<BdbTopicAuthControlEntity> resultEntities =
- brokerConfManager.confGetBdbTopicAuthCtrlEntityList(queryEntity);
- SimpleDateFormat formatter =
- new SimpleDateFormat(TBaseConstants.META_TMP_DATE_VALUE);
- int i = 0;
- sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":")
- .append(resultEntities.size()).append(",\"data\":[");
- for (BdbTopicAuthControlEntity entity : resultEntities) {
- if (i++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"topicName\":\"").append(entity.getTopicName())
- .append("\",\"isEnable\":").append(entity.isEnableAuthControl())
- .append(",\"createUser\":\"").append(entity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
- .append("\",\"authConsumeGroup\":[");
- BdbConsumerGroupEntity webConsumerGroupEntity =
- new BdbConsumerGroupEntity();
- webConsumerGroupEntity.setGroupTopicName(entity.getTopicName());
- List<BdbConsumerGroupEntity> webConsumerGroupEntities =
- brokerConfManager.confGetBdbAllowedConsumerGroupSet(webConsumerGroupEntity);
- int j = 0;
- if (!webConsumerGroupEntities.isEmpty()) {
- for (BdbConsumerGroupEntity itemEntity : webConsumerGroupEntities) {
- if (j++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"topicName\":\"").append(itemEntity.getGroupTopicName())
- .append("\",\"groupName\":\"")
- .append(itemEntity.getConsumerGroupName())
- .append("\",\"createUser\":\"")
- .append(itemEntity.getRecordCreateUser())
- .append("\",\"createDate\":\"")
- .append(formatter.format(itemEntity.getRecordCreateDate()))
- .append("\"}");
- }
- }
- sBuilder.append("],\"groupCount\":").append(j)
- .append(",\"authFilterCondSet\":[");
- BdbGroupFilterCondEntity webFilterCondEntity =
- new BdbGroupFilterCondEntity();
- webFilterCondEntity.setTopicName(entity.getTopicName());
- List<BdbGroupFilterCondEntity> webFilterCondEntities =
- brokerConfManager.confGetBdbAllowedGroupFilterCondSet(webFilterCondEntity);
- int y = 0;
- for (BdbGroupFilterCondEntity condEntity : webFilterCondEntities) {
- if (y++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append("{\"topicName\":\"").append(condEntity.getTopicName())
- .append("\",\"groupName\":\"").append(condEntity.getConsumerGroupName())
- .append("\",\"condStatus\":").append(condEntity.getControlStatus());
- if (condEntity.getFilterCondStr().length() <= 2) {
- sBuilder.append(",\"filterConds\":\"\"");
- } else {
- sBuilder.append(",\"filterConds\":\"")
- .append(condEntity.getFilterCondStr())
- .append("\"");
- }
- sBuilder.append(",\"createUser\":\"").append(condEntity.getCreateUser())
- .append("\",\"createDate\":\"").append(formatter.format(condEntity.getCreateDate()))
- .append("\"}");
- }
- sBuilder.append("],\"filterCount\":").append(y).append("}");
- }
- sBuilder.append("]}");
- } catch (Exception e) {
- sBuilder.delete(0, sBuilder.length());
- sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
+ public StringBuilder adminBatchAddTopicAuthControl(HttpServletRequest req) throws Exception {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
}
- return sBuilder;
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicJsonSet record map
+ if (!getTopicCtrlJsonSetInfo(req, opEntity, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Map<String, TopicCtrlEntity> addRecordMap =
+ (Map<String, TopicCtrlEntity>) result.getRetData();
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (TopicCtrlEntity topicCtrlInfo : addRecordMap.values()) {
+ retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(topicCtrlInfo, sBuffer, result));
+ }
+ return buildRetInfo(retInfo, sBuffer);
}
/**
- * Private method to get authorized consumer group info
+ * Delete topic authorization control
*
- * @param topicName
- * @param operator
- * @param createDate
- * @param jsonObject
- * @param groupAuthEntityMap
- * @param sBuilder
+ * @param req
* @return
- * @throws Exception
*/
- private HashMap<String, BdbConsumerGroupEntity> getAuthConsumeGroupInfo(
- final String topicName,
- final String operator,
- final Date createDate,
- final Map<String, String> jsonObject,
- HashMap<String, BdbConsumerGroupEntity> groupAuthEntityMap,
- final StringBuilder sBuilder) throws Exception {
- String strAuthConsumGroup = (String) jsonObject.get("authConsumeGroup");
- if ((strAuthConsumGroup != null) && (!TStringUtils.isBlank(strAuthConsumGroup))) {
- List<Map<String, String>> authConsumeGroupSet =
- new Gson().fromJson(strAuthConsumGroup, new TypeToken<List<Map<String, String>>>(){}.getType());
- if ((authConsumeGroupSet != null)
- && (!authConsumeGroupSet.isEmpty())) {
- for (int j = 0; j < authConsumeGroupSet.size(); j++) {
- Map<String, String> groupObject = authConsumeGroupSet.get(j);
- String groupName =
- WebParameterUtils.validGroupParameter("groupName",
- groupObject.get("groupName"),
- TBaseConstants.META_MAX_GROUPNAME_LENGTH,
- true, "");
- String groupTopicName =
- WebParameterUtils.validStringParameter("topicName",
- groupObject.get("topicName"),
- TBaseConstants.META_MAX_TOPICNAME_LENGTH,
- false, topicName);
- if (!groupTopicName.equals(topicName)) {
- throw new Exception("TopicName not equal in authConsumeGroup!");
- }
- String groupCreateUser =
- WebParameterUtils.validStringParameter("createUser",
- groupObject.get("createUser"),
- TBaseConstants.META_MAX_USERNAME_LENGTH,
- false, null);
- Date groupCreateDate =
- WebParameterUtils.validDateParameter("createDate",
- groupObject.get("createDate"),
- TBaseConstants.META_MAX_DATEVALUE_LENGTH,
- false, null);
- if ((TStringUtils.isBlank(groupCreateUser))
- || (groupCreateDate == null)) {
- groupCreateUser = operator;
- groupCreateDate = createDate;
- }
- String recordKey = sBuilder.append(groupName)
- .append("-").append(groupTopicName).toString();
- sBuilder.delete(0, sBuilder.length());
- groupAuthEntityMap.put(recordKey,
- new BdbConsumerGroupEntity(topicName,
- groupName, groupCreateUser, groupCreateDate));
- }
+ public StringBuilder adminDeleteTopicAuthControl(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check and get topicName info
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // delete records
+ List<TopicProcessResult> retInfo = new ArrayList<>();
+ for (String topicName : topicNameSet) {
+ metaDataManager.addOrUpdTopicCtrlConf(opEntity,
+ topicName, Boolean.FALSE, sBuffer, result);
+ retInfo.add(new TopicProcessResult(
+ TBaseConstants.META_VALUE_UNDEFINED, topicName, result));
+ }
+ return buildRetInfo(retInfo, sBuffer);
+ }
+
+ private boolean getTopicCtrlJsonSetInfo(HttpServletRequest req, BaseEntity defOpEntity,
+ StringBuilder sBuffer, ProcessResult result) {
+ if (!WebParameterUtils.getJsonArrayParamValue(req,
+ WebFieldDef.TOPICJSONSET, true, null, result)) {
+ return result.success;
+ }
+ List<Map<String, String>> deployJsonArray =
+ (List<Map<String, String>>) result.getRetData();
+ TopicCtrlEntity itemConf;
+ Map<String, TopicCtrlEntity> addRecordMap = new HashMap<>();
+ // check and get topic deployment configure
+ for (int j = 0; j < deployJsonArray.size(); j++) {
+ Map<String, String> confMap = deployJsonArray.get(j);
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(confMap,
+ true, defOpEntity, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ BaseEntity itemOpEntity = (BaseEntity) result.getRetData();
+ // get topicName configure info
+ if (!WebParameterUtils.getStringParamValue(confMap,
+ WebFieldDef.TOPICNAME, true, "", sBuffer, result)) {
+ return result.success;
}
+ String topicName = (String) result.getRetData();
+ // get authCtrlStatus info
+ if (!WebParameterUtils.getBooleanParamValue(confMap, WebFieldDef.ISENABLE,
+ false, false, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ Boolean enableTopicAuth = (Boolean) result.getRetData();
+ itemConf = new TopicCtrlEntity(itemOpEntity, topicName);
+ itemConf.updModifyInfo(itemOpEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, enableTopicAuth);
+ addRecordMap.put(itemConf.getTopicName(), itemConf);
+ }
+ // check result
+ if (addRecordMap.isEmpty()) {
+ result.setFailResult(sBuffer
+ .append("Not found record in ")
+ .append(WebFieldDef.TOPICJSONSET.name)
+ .append(" parameter!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(addRecordMap);
+ return result.isSuccess();
+ }
+
+ private StringBuilder buildRetInfo(List<TopicProcessResult> retInfo,
+ StringBuilder sBuffer) {
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (TopicProcessResult entry : retInfo) {
+ if (entry == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ sBuffer.append("{\"topicName\":\"").append(entry.getTopicName()).append("\"")
+ .append(",\"success\":").append(entry.isSuccess())
+ .append(",\"errCode\":").append(entry.getErrCode())
+ .append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
- return groupAuthEntityMap;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
}
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index 3b663e7..2e57e16 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -103,14 +103,14 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
// get brokerIp info
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPBROKERIP, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> brokerIpSet = (Set<String>) result.retData1;
+ Set<String> brokerIpSet = (Set<String>) result.getRetData();
// get brokerPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
@@ -174,21 +174,21 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// get isInclude info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.ISINCLUDE, false, true, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean isInclude = (Boolean) result.retData1;
+ Boolean isInclude = (Boolean) result.getRetData();
// get withTopic info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.WITHTOPIC, false, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean withTopic = (Boolean) result.retData1;
+ Boolean withTopic = (Boolean) result.getRetData();
// fill query entity fields
qryEntity.updModifyInfo(qryEntity.getDataVerId(), brokerPort, brokerTlsPort,
brokerWebPort, regionId, groupId, mngStatus, brokerProps);
@@ -281,14 +281,14 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean isReservedData = (Boolean) result.retData1;
+ Boolean isReservedData = (Boolean) result.getRetData();
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
List<BrokerProcessResult> retInfo =
metaDataManager.delBrokerConfInfo(opEntity.getModifyUser(),
isReservedData, brokerIds, sBuffer, result);
@@ -390,7 +390,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
for (Integer brokerId : brokerIdSet) {
retInfo.add(metaDataManager.addOrUpdBrokerConfig(isAddOp, opEntity,
brokerId, "", brokerPort, brokerTlsPort, brokerWebPort,
@@ -525,7 +525,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return result.success;
}
List<Map<String, String>> brokerJsonArray =
- (List<Map<String, String>>) result.retData1;
+ (List<Map<String, String>>) result.getRetData();
// check and get cluster default setting info
ClusterSettingEntity defClusterSetting =
metaDataManager.getClusterDefSetting(false);
@@ -607,7 +607,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebFieldDef.BROKERID, true, sBuffer, result)) {
return result.isSuccess();
}
- Integer brokerId = (Integer) result.retData1;
+ Integer brokerId = (Integer) result.getRetData();
itemEntity = new BrokerConfEntity(itemOpEntity, brokerId, "");
itemEntity.updModifyInfo(itemOpEntity.getDataVerId(), brokerPort, brokerTlsPort,
brokerWebPort, regionId, groupId, mngStatus, brokerProps);
@@ -653,7 +653,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
WebFieldDef.BROKERIP, true, null, sBuffer, result)) {
return result.success;
}
- String brokerIp = (String) result.retData1;
+ String brokerIp = (String) result.getRetData();
// get brokerId
if (!WebParameterUtils.getIntParamValue(paramCntr,
WebFieldDef.BROKERID, true, 0, 0, sBuffer, result)) {
@@ -708,7 +708,7 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
ManageStatus.STATUS_MANAGE_OFFLINE.getCode(), sBuffer, result)) {
return result.success;
}
- int manageStatusId = (int) result.retData1;
+ int manageStatusId = (int) result.getRetData();
try {
ManageStatus mngStatus = ManageStatus.valueOf(manageStatusId);
result.setSuccResult(mngStatus);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 5a6558c..06daeb0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -81,34 +81,34 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupSet = (Set<String>) result.retData1;
+ Set<String> groupSet = (Set<String>) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// get consumeEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.CONSUMEENABLE, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean consumeEnable = (Boolean) result.retData1;
+ Boolean consumeEnable = (Boolean) result.getRetData();
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.FILTERENABLE, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean filterEnable = (Boolean) result.retData1;
+ Boolean filterEnable = (Boolean) result.getRetData();
// get filterConds info
if (!WebParameterUtils.getFilterCondSet(req, false, true, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> filterCondSet = (Set<String>) result.retData1;
+ Set<String> filterCondSet = (Set<String>) result.getRetData();
qryEntity.updModifyInfo(qryEntity.getDataVerId(),
consumeEnable, null, filterEnable, null);
Map<String, List<GroupConsumeCtrlEntity>> qryResultMap =
@@ -203,14 +203,14 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// execute delete operation
List<GroupProcessResult> retInfo =
metaDataManager.delGroupConsumeCtrlConf(opEntity.getModifyUser(),
@@ -241,14 +241,14 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// get groupName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> groupNameSet = (Set<String>) result.retData1;
+ Set<String> groupNameSet = (Set<String>) result.getRetData();
// get consumeEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.CONSUMEENABLE, false,
@@ -256,7 +256,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean consumeEnable = (Boolean) result.retData1;
+ Boolean consumeEnable = (Boolean) result.getRetData();
// get disableReason list
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.REASON, false,
@@ -264,7 +264,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- String disableRsn = (String) result.retData1;
+ String disableRsn = (String) result.getRetData();
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.FILTERENABLE, false,
@@ -272,13 +272,13 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean filterEnable = (Boolean) result.retData1;
+ Boolean filterEnable = (Boolean) result.getRetData();
// get filterConds info
if (!WebParameterUtils.getFilterCondString(req, false, isAddOp, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- String filterCondStr = (String) result.retData1;
+ String filterCondStr = (String) result.getRetData();
// add group resource record
List<GroupProcessResult> retInfo = new ArrayList<>();
for (String groupName : groupNameSet) {
@@ -352,7 +352,7 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
return result.success;
}
List<Map<String, String>> filterJsonArray =
- (List<Map<String, String>>) result.retData1;
+ (List<Map<String, String>>) result.getRetData();
// parse groupCsmJsonSet field info
GroupConsumeCtrlEntity itemConf;
Map<String, String> itemsMap;
@@ -365,12 +365,12 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
return result.success;
}
- String groupName = (String) result.retData1;
+ String groupName = (String) result.getRetData();
if (!WebParameterUtils.getStringParamValue(itemsMap,
WebFieldDef.TOPICNAME, true, "", sBuffer, result)) {
return result.success;
}
- String topicName = (String) result.retData1;
+ String topicName = (String) result.getRetData();
if (!configuredTopicSet.contains(topicName)) {
result.setFailResult(sBuffer
.append(WebFieldDef.TOPICNAME.name)
@@ -385,26 +385,26 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
(isAddOp ? true : null), sBuffer, result)) {
return result.isSuccess();
}
- Boolean consumeEnable = (Boolean) result.retData1;
+ Boolean consumeEnable = (Boolean) result.getRetData();
// get disableReason list
if (!WebParameterUtils.getStringParamValue(itemsMap,
WebFieldDef.REASON, false, (isAddOp ? "" : null), sBuffer, result)) {
return result.isSuccess();
}
- String disableRsn = (String) result.retData1;
+ String disableRsn = (String) result.getRetData();
// get filterEnable info
if (!WebParameterUtils.getBooleanParamValue(itemsMap,
WebFieldDef.FILTERENABLE, false,
(isAddOp ? false : null), sBuffer, result)) {
return result.isSuccess();
}
- Boolean filterEnable = (Boolean) result.retData1;
+ Boolean filterEnable = (Boolean) result.getRetData();
// get filterConds info
if (!WebParameterUtils.getFilterCondString(
itemsMap, false, isAddOp, sBuffer, result)) {
return result.isSuccess();
}
- String filterCondStr = (String) result.retData1;
+ String filterCondStr = (String) result.getRetData();
// add record object
itemConf = new GroupConsumeCtrlEntity(defOpEntity, groupName, topicName);
itemConf.updModifyInfo(defOpEntity.getDataVerId(),
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index 476b1e9..20cce00 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -81,21 +81,14 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> inGroupSet = (Set<String>) result.retData1;
- // get consumeEnable info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.CONSUMEENABLE, false, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Boolean consumeEnable = (Boolean) result.retData1;
+ Set<String> inGroupSet = (Set<String>) result.getRetData();
// get resCheckStatus info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.RESCHECKENABLE, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean resCheckEnable = (Boolean) result.retData1;
+ Boolean resCheckEnable = (Boolean) result.getRetData();
// get and valid qryPriorityId info
if (!WebParameterUtils.getQryPriorityIdParameter(req,
false, TBaseConstants.META_VALUE_UNDEFINED,
@@ -103,15 +96,15 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- int inQryPriorityId = (int) result.retData1;
+ int inQryPriorityId = (int) result.getRetData();
// get flowCtrlEnable info
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.FLOWCTRLENABLE, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean flowCtrlEnable = (Boolean) result.retData1;
- entity.updModifyInfo(entity.getDataVerId(), consumeEnable, null,
+ Boolean flowCtrlEnable = (Boolean) result.getRetData();
+ entity.updModifyInfo(entity.getDataVerId(),
resCheckEnable, TBaseConstants.META_VALUE_UNDEFINED, inQryPriorityId,
flowCtrlEnable, TBaseConstants.META_VALUE_UNDEFINED, null);
Map<String, GroupResCtrlEntity> groupResCtrlEntityMap =
@@ -199,7 +192,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
+ Set<String> batchGroupNames = (Set<String>) result.getRetData();
// delete group resource record
List<GroupProcessResult> retInfo =
metaDataManager.delGroupResCtrlConf(opEntity.getModifyUser(),
@@ -229,28 +222,14 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> batchGroupNames = (Set<String>) result.retData1;
- // get consumeEnable info
- if (!WebParameterUtils.getBooleanParamValue(req, WebFieldDef.CONSUMEENABLE,
- false, (isAddOp ? true : null), sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Boolean consumeEnable = (Boolean) result.retData1;
- // get disableReason info
- if (!WebParameterUtils.getStringParamValue(req, WebFieldDef.REASON,
- false, (isAddOp ? "" : null), sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- String disableRsn = (String) result.retData1;
+ Set<String> batchGroupNames = (Set<String>) result.getRetData();
// get resCheckStatus info
if (!WebParameterUtils.getBooleanParamValue(req, WebFieldDef.RESCHECKENABLE,
false, (isAddOp ? false : null), sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean resCheckEnable = (Boolean) result.retData1;
+ Boolean resCheckEnable = (Boolean) result.getRetData();
// get and valid allowedBrokerClientRate info
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.ALWDBCRATE,
false, (isAddOp ? TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN
@@ -259,7 +238,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- int allowedBClientRate = (int) result.retData1;
+ int allowedBClientRate = (int) result.getRetData();
// get def cluster setting info
ClusterSettingEntity defClusterSetting =
metaDataManager.getClusterDefSetting(false);
@@ -271,14 +250,14 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- int qryPriorityId = (int) result.retData1;
+ int qryPriorityId = (int) result.getRetData();
// get flowCtrlEnable info
if (!WebParameterUtils.getBooleanParamValue(req, WebFieldDef.FLOWCTRLENABLE,
false, (isAddOp ? false : null), sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean flowCtrlEnable = (Boolean) result.retData1;
+ Boolean flowCtrlEnable = (Boolean) result.getRetData();
// get and flow control rule info
int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req,
(isAddOp ? TServerConstants.BLANK_FLOWCTRL_RULES : null), sBuffer, result);
@@ -286,15 +265,13 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- String flowCtrlInfo = (String) result.retData1;
+ String flowCtrlInfo = (String) result.getRetData();
// add group resource record
- GroupProcessResult retItem;
List<GroupProcessResult> retInfo = new ArrayList<>();
for (String groupName : batchGroupNames) {
- retItem = metaDataManager.addOrUpdGroupResCtrlConf(isAddOp, opEntity, groupName,
- consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
- qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuffer, result);
- retInfo.add(retItem);
+ retInfo.add(metaDataManager.addOrUpdGroupResCtrlConf(isAddOp, opEntity, groupName,
+ resCheckEnable, allowedBClientRate, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, sBuffer, result));
}
return buildRetInfo(retInfo, sBuffer);
}
@@ -339,7 +316,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return result.success;
}
List<Map<String, String>> ctrlJsonArray =
- (List<Map<String, String>>) result.retData1;
+ (List<Map<String, String>>) result.getRetData();
// get default qryPriorityId
ClusterSettingEntity defClusterSetting =
metaDataManager.getClusterDefSetting(false);
@@ -361,25 +338,13 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
WebFieldDef.GROUPNAME, true, "", sBuffer, result)) {
return result.success;
}
- String groupName = (String) result.retData1;
- // get consumeEnable info
- if (!WebParameterUtils.getBooleanParamValue(itemValueMap, WebFieldDef.CONSUMEENABLE,
- false, (isAddOp ? true : null), sBuffer, result)) {
- return result.isSuccess();
- }
- Boolean consumeEnable = (Boolean) result.retData1;
- // get disableReason info
- if (!WebParameterUtils.getStringParamValue(itemValueMap,
- WebFieldDef.REASON, false, (isAddOp ? "" : null), sBuffer, result)) {
- return result.isSuccess();
- }
- String disableRsn = (String) result.retData1;
+ String groupName = (String) result.getRetData();
// get resCheckStatus info
if (!WebParameterUtils.getBooleanParamValue(itemValueMap, WebFieldDef.RESCHECKENABLE,
false, (isAddOp ? false : null), sBuffer, result)) {
return result.isSuccess();
}
- Boolean resCheckEnable = (Boolean) result.retData1;
+ Boolean resCheckEnable = (Boolean) result.getRetData();
// get and valid allowedBrokerClientRate info
if (!WebParameterUtils.getIntParamValue(itemValueMap, WebFieldDef.ALWDBCRATE,
false, (isAddOp ? TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN
@@ -387,7 +352,7 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
TServerConstants.GROUP_BROKER_CLIENT_RATE_MIN, sBuffer, result)) {
return result.isSuccess();
}
- int allowedBClientRate = (int) result.retData1;
+ int allowedBClientRate = (int) result.getRetData();
// get def cluster setting info
// get and valid qryPriorityId info
if (!WebParameterUtils.getQryPriorityIdParameter(itemValueMap,
@@ -396,26 +361,26 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
TServerConstants.QRY_PRIORITY_MIN_VALUE, sBuffer, result)) {
return result.isSuccess();
}
- int qryPriorityId = (int) result.retData1;
+ int qryPriorityId = (int) result.getRetData();
// get flowCtrlEnable info
if (!WebParameterUtils.getBooleanParamValue(itemValueMap,
WebFieldDef.FLOWCTRLENABLE, false,
(isAddOp ? false : null), sBuffer, result)) {
return result.isSuccess();
}
- Boolean flowCtrlEnable = (Boolean) result.retData1;
+ Boolean flowCtrlEnable = (Boolean) result.getRetData();
// get and flow control rule info
int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(itemValueMap,
(isAddOp ? TServerConstants.BLANK_FLOWCTRL_RULES : null), sBuffer, result);
- if (!result.success) {
+ if (!result.isSuccess()) {
return result.isSuccess();
}
- String flowCtrlInfo = (String) result.retData1;
+ String flowCtrlInfo = (String) result.getRetData();
itemEntity =
new GroupResCtrlEntity(itemOpEntity, groupName);
itemEntity.updModifyInfo(itemEntity.getDataVerId(),
- consumeEnable, disableRsn, resCheckEnable, allowedBClientRate,
- qryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
+ resCheckEnable, allowedBClientRate, qryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo);
addRecordMap.put(itemEntity.getGroupName(), itemEntity);
}
// check result
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 3aa491c..aba5cd7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -31,7 +31,6 @@ import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
@@ -61,22 +60,32 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
// register query method
registerQueryWebMethod("admin_query_master_group_info",
"getGroupAddressStrInfo");
- registerQueryWebMethod("admin_query_cluster_default_setting",
- "adminQueryClusterDefSetting");
registerQueryWebMethod("admin_query_cluster_topic_view",
"adminQueryClusterTopicView");
+ registerQueryWebMethod("admin_query_cluster_default_setting",
+ "adminQueryClusterDefSetting");
// register modify method
registerModifyWebMethod("admin_transfer_current_master",
"transferCurrentMaster");
- // register modify method
registerModifyWebMethod("admin_set_cluster_default_setting",
"adminSetClusterDefSetting");
+ registerModifyWebMethod("admin_update_cluster_default_setting",
+ "adminUpdClusterDefSetting");
// Deprecated methods begin
// query method
registerQueryWebMethod("admin_query_def_flow_control_rule",
- "adminQueryDefGroupFlowCtrlRule");
+ "adminQueryDefFlowCtrlRule");
+ // register modify method
+ registerModifyWebMethod("admin_set_def_flow_control_rule",
+ "adminSetDefFlowControlRule");
+ registerModifyWebMethod("admin_rmv_def_flow_control_rule",
+ "adminDelDefFlowControlRule");
+ registerModifyWebMethod("admin_upd_def_flow_control_rule",
+ "adminModDefFlowCtrlRule");
+
+ // Deprecated methods end
}
/**
@@ -149,136 +158,64 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
*
* @param req
* @return
- * @throws Exception
*/
public StringBuilder adminQueryClusterDefSetting(HttpServletRequest req) {
- StringBuilder sBuilder = new StringBuilder(512);
- ClusterSettingEntity defClusterSetting =
- metaDataManager.getClusterDefSetting(true);
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
- if (defClusterSetting != null) {
- defClusterSetting.toWebJsonStr(sBuilder, true, true);
- }
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, 1);
- return sBuilder;
+ StringBuilder sBuffer = new StringBuilder(512);
+ return buildRetInfo(sBuffer, true);
}
/**
- * Add or modify cluster default setting
+ * query default flow control rule
*
* @param req
* @return
*/
- public StringBuilder adminSetClusterDefSetting(HttpServletRequest req) {
- ProcessResult result = new ProcessResult();
+ public StringBuilder adminQueryDefFlowCtrlRule(HttpServletRequest req) {
StringBuilder sBuffer = new StringBuilder(512);
- // valid operation authorize info
- if (!WebParameterUtils.validReqAuthorizeInfo(req,
- WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- // check and get operation info
- if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- BaseEntity opEntity = (BaseEntity) result.getRetData();
- // check max message size
- if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.MAXMSGSIZEINMB, false,
- TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
- TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int inMaxMsgSizeMB = (int) result.getRetData();
- // get broker port info
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int inBrokerPort = (int) result.getRetData();
- // get broker tls port info
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int inBrokerTlsPort = (int) result.getRetData();
- // get broker web port info
- if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
- false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int inBrokerWebPort = (int) result.getRetData();
- // get and valid TopicPropGroup info
- TopicPropGroup defTopicProps = new TopicPropGroup();
- defTopicProps.fillDefaultValue();
- if (!WebParameterUtils.getTopicPropInfo(req, defTopicProps, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- defTopicProps = (TopicPropGroup) result.getRetData();
- // get and valid qryPriorityId info
- if (!WebParameterUtils.getQryPriorityIdParameter(req,
- false, TBaseConstants.META_VALUE_UNDEFINED,
- TServerConstants.QRY_PRIORITY_MIN_VALUE, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- int inQryPriorityId = (int) result.retData1;
- // get flowCtrlEnable info
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.FLOWCTRLENABLE, false, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- Boolean flowCtrlEnable = (Boolean) result.retData1;
- // get and flow control rule info
- int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req, null, sBuffer, result);
- if (!result.success) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- String flowCtrlInfo = (String) result.retData1;
- // add or modify record
- ClusterSettingEntity newConf = null;
- ClusterSettingEntity curConf = metaDataManager.getClusterDefSetting(true);
- if (curConf == null) {
- if (!metaDataManager.addClusterDefSetting(opEntity, inBrokerPort,
- inBrokerTlsPort, inBrokerWebPort, inMaxMsgSizeMB,
- inQryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo,
- defTopicProps, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- } else {
- if (!metaDataManager.modClusterDefSetting(opEntity, inBrokerPort,
- inBrokerTlsPort, inBrokerWebPort, inMaxMsgSizeMB,
- inQryPriorityId, flowCtrlEnable, flowRuleCnt, flowCtrlInfo,
- defTopicProps, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
- return sBuffer;
- }
- }
- curConf = metaDataManager.getClusterDefSetting(true);
- if (curConf == null) {
- WebParameterUtils.buildFailResultWithBlankData(
- DataOpErrCode.DERR_UPD_NOT_EXIST.getCode(),
- DataOpErrCode.DERR_UPD_NOT_EXIST.getDescription(), sBuffer);
- return sBuffer;
- }
- // build return result
- WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
- curConf.toWebJsonStr(sBuffer, true, true);
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
- return sBuffer;
+ return buildRetInfo(sBuffer, false);
+ }
+
+ /**
+ * Add cluster default setting
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminSetClusterDefSetting(HttpServletRequest req) {
+ return innAddOrUpdDefFlowControlRule(req, true, true);
+ }
+
+ /**
+ * Modify cluster default setting
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminUpdClusterDefSetting(HttpServletRequest req) {
+ return innAddOrUpdDefFlowControlRule(req, false, true);
+ }
+
+ /**
+ * add default flow control rule
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminSetDefFlowControlRule(HttpServletRequest req) {
+ return innAddOrUpdDefFlowControlRule(req, true, false);
+ }
+
+ /**
+ * update default flow control rule
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminModDefFlowCtrlRule(HttpServletRequest req) {
+ return innAddOrUpdDefFlowControlRule(req, false, false);
}
+
/**
* Query cluster topic overall view
*
@@ -294,14 +231,14 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
// check and get topicName field
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// query topic configure info
Map<String, List<TopicDeployEntity>> topicConfMap =
metaDataManager.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIds);
@@ -376,4 +313,155 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
return sBuffer;
}
+ /**
+ * delete flow control rule
+ *
+ * @param req
+ * @return
+ */
+ public StringBuilder adminDelDefFlowControlRule(HttpServletRequest req) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // add or modify record
+ if (!metaDataManager.addOrUpdClusterDefSetting(opEntity,
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED, Boolean.FALSE, 0,
+ TServerConstants.BLANK_FLOWCTRL_RULES, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ return buildRetInfo(sBuffer, false);
+ }
+
+
+ /**
+ * add default flow control rule
+ *
+ * @param req
+ * @param isAddOp
+ * @param isNewVer
+ * @return
+ */
+ private StringBuilder innAddOrUpdDefFlowControlRule(HttpServletRequest req,
+ boolean isAddOp, boolean isNewVer) {
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuffer = new StringBuilder(512);
+ // valid operation authorize info
+ if (!WebParameterUtils.validReqAuthorizeInfo(req,
+ WebFieldDef.ADMINAUTHTOKEN, true, master, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, isAddOp, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ BaseEntity opEntity = (BaseEntity) result.getRetData();
+ // check max message size
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.MAXMSGSIZEINMB, false,
+ TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int maxMsgSizeMB = (int) result.getRetData();
+ // get broker port info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int inBrokerPort = (int) result.getRetData();
+ // get broker tls port info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int inBrokerTlsPort = (int) result.getRetData();
+ // get broker web port info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int inBrokerWebPort = (int) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ TopicPropGroup defTopicProps = (TopicPropGroup) result.getRetData();
+ // get and valid qryPriorityId info
+ if (!WebParameterUtils.getQryPriorityIdParameter(req,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.QRY_PRIORITY_MIN_VALUE, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ int inQryPriorityId = (int) result.getRetData();
+ // get flowCtrlEnable info
+ if (isNewVer) {
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.FLOWCTRLENABLE, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ } else {
+ if (!WebParameterUtils.getFlowCtrlStatusParamValue(req,
+ false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ }
+ Boolean flowCtrlEnable = (Boolean) result.getRetData();
+ // get and flow control rule info
+ int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req,
+ (isAddOp ? TServerConstants.BLANK_FLOWCTRL_RULES : null), sBuffer, result);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ String flowCtrlInfo = (String) result.getRetData();
+ // add or modify record
+ if (!metaDataManager.addOrUpdClusterDefSetting(opEntity, inBrokerPort,
+ inBrokerTlsPort, inBrokerWebPort, maxMsgSizeMB, inQryPriorityId,
+ flowCtrlEnable, flowRuleCnt, flowCtrlInfo, defTopicProps, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
+ return sBuffer;
+ }
+ return buildRetInfo(sBuffer, isNewVer);
+ }
+
+
+ private StringBuilder buildRetInfo(StringBuilder sBuffer, boolean isNewVer) {
+ ClusterSettingEntity curConf =
+ metaDataManager.getClusterDefSetting(true);
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ if (curConf != null) {
+ if (isNewVer) {
+ curConf.toWebJsonStr(sBuffer, true, true);
+ } else {
+ curConf.toOldVerFlowCtrlWebJsonStr(sBuffer, true);
+ }
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, 1);
+ return sBuffer;
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 1ab168f..74e1a6f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -76,21 +76,21 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> inGroupNameSet = (Set<String>) result.retData1;
+ Set<String> inGroupNameSet = (Set<String>) result.getRetData();
if (inGroupNameSet.isEmpty()) {
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSCONSUMEGROUP, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- inGroupNameSet = (Set<String>) result.retData1;
+ inGroupNameSet = (Set<String>) result.getRetData();
}
if (!WebParameterUtils.getStringParamValue(req,
WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
Set<String> queryGroupSet =
topicPSInfoManager.getGroupSetWithSubTopic(inGroupNameSet, topicNameSet);
@@ -139,7 +139,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
return sBuffer;
}
}
- String strConsumeGroup = (String) result.retData1;
+ String strConsumeGroup = (String) result.getRetData();
try {
boolean isBandConsume = false;
boolean isNotAllocate = false;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
index b8c9190..fd1318f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicCtrlHandler.java
@@ -88,7 +88,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// query matched records
Map<String, TopicCtrlEntity> topicCtrlMap =
metaDataManager.getTopicCtrlConf(topicNameSet, qryEntity);
@@ -176,7 +176,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// delete records
List<TopicProcessResult> retInfo = new ArrayList<>();
for (String topicName : topicNameSet) {
@@ -208,7 +208,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// get topicNameId info
int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
if (topicNameSet.size() == 1) {
@@ -225,7 +225,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Boolean enableTopicAuth = (Boolean) result.retData1;
+ Boolean enableTopicAuth = (Boolean) result.getRetData();
// check and get max message size
ClusterSettingEntity defClusterSetting =
metaDataManager.getClusterDefSetting(false);
@@ -241,7 +241,6 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
}
maxMsgSizeMB = (int) result.getRetData();
// add or update records
- TopicProcessResult retItem;
List<TopicProcessResult> retInfo = new ArrayList<>();
for (String topicName : topicNameSet) {
retInfo.add(metaDataManager.addOrUpdTopicCtrlConf(isAddOp, opEntity,
@@ -288,7 +287,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
return result.success;
}
List<Map<String, String>> ctrlJsonArray =
- (List<Map<String, String>>) result.retData1;
+ (List<Map<String, String>>) result.getRetData();
// get default max message size
ClusterSettingEntity defClusterSetting =
metaDataManager.getClusterDefSetting(false);
@@ -310,7 +309,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
WebFieldDef.TOPICNAME, true, "", sBuffer, result)) {
return result.success;
}
- String topicName = (String) result.retData1;
+ String topicName = (String) result.getRetData();
// check max message size
if (!WebParameterUtils.getIntParamValue(itemConfMap,
WebFieldDef.MAXMSGSIZEINMB, false,
@@ -331,7 +330,7 @@ public class WebTopicCtrlHandler extends AbstractWebHandler {
false, (isAddOp ? false : null), sBuffer, result)) {
return result.isSuccess();
}
- Boolean enableTopicAuth = (Boolean) result.retData1;
+ Boolean enableTopicAuth = (Boolean) result.getRetData();
itemConf = new TopicCtrlEntity(itemOpEntity, topicName);
itemConf.updModifyInfo(itemOpEntity.getDataVerId(),
itemTopicNameId, itemMaxMsgSizeMB, enableTopicAuth);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index b81495a..642b568 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -215,14 +215,14 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
Map<Integer, List<TopicDeployEntity>> queryResult =
metaDataManager.getTopicDeployInfoMap(topicNameSet, brokerIdSet);
// build query result
@@ -309,7 +309,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIds = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIds = (Set<Integer>) result.getRetData();
Map<Integer, Set<String>> brokerTopicConfigMap =
metaDataManager.getBrokerTopicConfigInfo(brokerIds);
// build query result
@@ -348,13 +348,13 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
if (!WebParameterUtils.getBooleanParamValue(req,
WebFieldDef.WITHIP, false, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- boolean withIp = (Boolean) result.retData1;
+ boolean withIp = (Boolean) result.getRetData();
Map<String, Map<Integer, String>> topicBrokerConfigMap =
metaDataManager.getTopicBrokerConfigInfo(topicNameSet);
// build query result
@@ -410,14 +410,14 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// check and get brokerId field
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
// get brokerPort field
if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, result)) {
@@ -719,14 +719,14 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// check and get brokerId info
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
// get and valid TopicPropGroup info
if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
@@ -789,7 +789,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
return result.success;
}
List<Map<String, String>> deployJsonArray =
- (List<Map<String, String>>) result.retData1;
+ (List<Map<String, String>>) result.getRetData();
TopicDeployEntity itemConf;
Map<String, TopicDeployEntity> addRecordMap = new HashMap<>();
// check and get topic deployment configure
@@ -806,7 +806,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebFieldDef.TOPICNAME, true, "", sBuffer, result)) {
return result.success;
}
- String topicName = (String) result.retData1;
+ String topicName = (String) result.getRetData();
// get broker configure info
if (!getBrokerConfInfo(confMap, sBuffer, result)) {
return result.isSuccess();
@@ -920,14 +920,14 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> topicNameSet = (Set<String>) result.getRetData();
// check and get brokerId info
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.COMPSBROKERID, true, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
- Set<Integer> brokerIdSet = (Set<Integer>) result.retData1;
+ Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
// modify record status
List<TopicProcessResult> retInfo = new ArrayList<>();
for (Integer brokerId : brokerIdSet) {