You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/20 01:39:56 UTC
[incubator-inlong] branch INLONG-570 updated: [INLONG-1447] Fix
Group Control API logic bug (#1450)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch INLONG-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-570 by this push:
new f99433e [INLONG-1447] Fix Group Control API logic bug (#1450)
f99433e is described below
commit f99433e1f6f90e031023e05556962f08ceba694d
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Aug 20 09:39:48 2021 +0800
[INLONG-1447] Fix Group Control API logic bug (#1450)
---
.../corebase/policies/FlowCtrlRuleHandler.java | 315 +++++++++++++++------
.../inlong/tubemq/corebase/utils/TStringUtils.java | 6 +-
.../tubemq/server/common/fielddef/WebFieldDef.java | 5 +-
.../bdbentitys/BdbGroupFlowCtrlEntity.java | 1 -
.../server/master/metamanage/MetaDataManager.java | 18 +-
.../dao/entity/GroupConsumeCtrlEntity.java | 7 +-
.../metastore/dao/entity/TopicCtrlEntity.java | 18 +-
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 23 +-
.../bdbimpl/BdbGroupConsumeCtrlMapperImpl.java | 1 -
.../web/handler/WebAdminGroupCtrlHandler.java | 24 +-
.../web/handler/WebGroupConsumeCtrlHandler.java | 8 +-
.../master/web/handler/WebGroupResCtrlHandler.java | 12 +-
.../master/web/handler/WebTopicDeployHandler.java | 4 +-
13 files changed, 294 insertions(+), 148 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index ca45b48..8039536 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -18,6 +18,7 @@
package org.apache.inlong.tubemq.corebase.policies;
import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.ArrayList;
@@ -312,8 +313,9 @@ public class FlowCtrlRuleHandler {
/**
- * @param flowCtrlInfo
- * @return
+ * Parse FlowCtrlInfo value
+ * @param flowCtrlInfo flowCtrlInfo json value
+ * @return parse result
* @throws Exception
*/
public Map<Integer, List<FlowCtrlItem>> parseFlowCtrlInfo(final String flowCtrlInfo)
@@ -335,18 +337,29 @@ public class FlowCtrlRuleHandler {
return flowCtrlMap;
}
try {
+ int recordNo;
List<FlowCtrlItem> flowCtrlItemList;
for (int i = 0; i < objArray.size(); i++) {
- JsonObject jsonObject = objArray.get(i).getAsJsonObject();
+ JsonElement jsonItem = objArray.get(i);
+ if (jsonItem == null) {
+ continue;
+ }
+ recordNo = i + 1;
+ JsonObject jsonObject = jsonItem.getAsJsonObject();
+ if (!jsonObject.has("type")) {
+ throw new Exception(new StringBuilder(512)
+ .append("FIELD type is required in record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
int typeVal = jsonObject.get("type").getAsInt();
if (typeVal < 0 || typeVal > 3) {
throw new Exception(new StringBuilder(512)
- .append("type value must in [0,1,3] in index(")
- .append(i).append(") of flowCtrlInfo value!").toString());
+ .append("the value of FIELD type must in [0,1,3] in record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
switch (typeVal) {
case 1:
- flowCtrlItemList = parseFreqLimit(typeVal, jsonObject);
+ flowCtrlItemList = parseFreqLimit(recordNo, typeVal, jsonObject);
break;
case 2: /* Deprecated */
@@ -354,13 +367,13 @@ public class FlowCtrlRuleHandler {
break;
case 3:
- flowCtrlItemList = parseLowFetchLimit(typeVal, jsonObject);
+ flowCtrlItemList = parseLowFetchLimit(recordNo, typeVal, jsonObject);
break;
case 0:
default:
typeVal = 0;
- flowCtrlItemList = parseDataLimit(typeVal, jsonObject);
+ flowCtrlItemList = parseDataLimit(recordNo, typeVal, jsonObject);
break;
}
if (flowCtrlItemList != null && !flowCtrlItemList.isEmpty()) {
@@ -368,79 +381,108 @@ public class FlowCtrlRuleHandler {
}
}
} catch (Throwable e2) {
- throw new Exception(new StringBuilder(512).append("Parse flow-ctrl rule failure, ")
+ throw new Exception(new StringBuilder(512)
+ .append("Parse flowCtrlInfo value failure, ")
.append(e2.getMessage()).toString());
}
return flowCtrlMap;
}
-
/**
* lizard forgives
*
- * @param typeVal
- * @param jsonObject
- * @return
+ * Parse data consumption limit rule info
+ *
+ * @param recordNo record no
+ * @param typeVal type value
+ * @param jsonObject record json value
+ * @return parsed result
* @throws Exception
*/
- private List<FlowCtrlItem> parseDataLimit(int typeVal, JsonObject jsonObject) throws Exception {
+ private List<FlowCtrlItem> parseDataLimit(int recordNo, int typeVal,
+ JsonObject jsonObject) throws Exception {
if (jsonObject == null || jsonObject.get("type").getAsInt() != 0) {
- throw new Exception("parse data limit rule failure!");
+ throw new Exception(new StringBuilder(512)
+ .append("parse data_limit rule failure in record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
+ if (!jsonObject.has("rule")) {
+ throw new Exception(new StringBuilder(512)
+ .append("FIELD rule is required in data_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
JsonArray ruleArray = jsonObject.get("rule").getAsJsonArray();
if (ruleArray == null) {
- throw new Exception("not found rule list in data limit!");
+ throw new Exception(new StringBuilder(512)
+ .append("emtpy rule define in data_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
+ // parse rule item
+ int itemNo;
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
+ itemNo = index + 1;
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
- int startTime = validAndGetTimeValue("start",
- ruleObject.get("start").getAsString(), index, "data");
- int endTime = validAndGetTimeValue("end",
- ruleObject.get("end").getAsString(), index, "data");
+ int startTime = validAndGetTimeValue(ruleObject, "start", itemNo, recordNo);
+ int endTime = validAndGetTimeValue(ruleObject, "end", itemNo, recordNo);
if (startTime >= endTime) {
throw new Exception(new StringBuilder(512)
- .append("start value must lower than the End value in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("the value of FIELD start must lower than the value FIELD end ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (!ruleObject.has("dltInM")) {
throw new Exception(new StringBuilder(512)
- .append("dltInM key is required in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("FIELD dltInM is required in data_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
long dltVal = ruleObject.get("dltInM").getAsLong();
if (dltVal <= 20) {
throw new Exception(new StringBuilder(512)
- .append("dltInM value must be greater than 20 in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("the value of FIELD dltInM must be greater than 20 ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (!ruleObject.has("limitInM")) {
throw new Exception(new StringBuilder(512)
- .append("limitInM key is required in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("FIELD limitInM is required in data_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
long dataLimitInM = ruleObject.get("limitInM").getAsLong();
if (dataLimitInM < 0) {
throw new Exception(new StringBuilder(512)
- .append("limitInM value must be greater than or equal to zero in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("the value of FIELD limitInM must be greater than or equal to 0 ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
dataLimitInM = dataLimitInM * 1024 * 1024;
if (!ruleObject.has("freqInMs")) {
throw new Exception(new StringBuilder(512)
- .append("freqInMs key is required in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("FIELD freqInMs is required in data_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
int freqInMs = ruleObject.get("freqInMs").getAsInt();
if (freqInMs < 200) {
throw new Exception(new StringBuilder(512)
- .append("freqInMs value must be greater than or equal to 200 in index(")
- .append(index).append(") of data limit rule!").toString());
+ .append("the value of FIELD freqInMs must be greater than or equal to 200 ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
flowCtrlItems.add(new FlowCtrlItem(typeVal,
startTime, endTime, dltVal, dataLimitInM, freqInMs));
}
-
+ if (flowCtrlItems.isEmpty()) {
+ throw new Exception(new StringBuilder(512)
+ .append("not found valid rule define in data_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
Collections.sort(flowCtrlItems, new Comparator<FlowCtrlItem>() {
@Override
public int compare(final FlowCtrlItem o1, final FlowCtrlItem o2) {
@@ -457,48 +499,73 @@ public class FlowCtrlRuleHandler {
}
/**
- * @param typeVal
- * @param jsonObject
- * @return
+ * Parse frequent limit rule info
+ *
+ * @param recordNo record no
+ * @param typeVal type value
+ * @param jsonObject record json value
+ * @return parsed result
* @throws Exception
*/
- private List<FlowCtrlItem> parseFreqLimit(int typeVal,
+ private List<FlowCtrlItem> parseFreqLimit(int recordNo, int typeVal,
JsonObject jsonObject) throws Exception {
if (jsonObject == null || jsonObject.get("type").getAsInt() != 1) {
- throw new Exception("parse freq limit rule failure!");
+ throw new Exception(new StringBuilder(512)
+ .append("parse freq_limit rule failure in record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
+ if (!jsonObject.has("rule")) {
+ throw new Exception(new StringBuilder(512)
+ .append("FIELD rule is required in freq_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
JsonArray ruleArray = jsonObject.get("rule").getAsJsonArray();
if (ruleArray == null) {
- throw new Exception("not found rule list in freq limit!");
+ throw new Exception(new StringBuilder(512)
+ .append("emtpy rule define in freq_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
+ int itemNo;
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
+ itemNo = index + 1;
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
if (!ruleObject.has("zeroCnt")) {
throw new Exception(new StringBuilder(512)
- .append("zeroCnt key is required in index(")
- .append(index).append(") of freq limit rule!").toString());
+ .append("FIELD zeroCnt is required in freq_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
int zeroCnt = ruleObject.get("zeroCnt").getAsInt();
if (zeroCnt < 1) {
throw new Exception(new StringBuilder(512)
- .append("zeroCnt value must be greater than or equal to 1 in index(")
- .append(index).append(") of freq limit rule!").toString());
+ .append("the value of FIELD zeroCnt must be greater than or equal to 1 ")
+ .append("in freq_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (!ruleObject.has("freqInMs")) {
throw new Exception(new StringBuilder(512)
- .append("freqInMs key is required in index(")
- .append(index).append(") of freq limit rule!").toString());
+ .append("FIELD freqInMs is required in freq_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
int freqInMs = ruleObject.get("freqInMs").getAsInt();
if (freqInMs < 0) {
throw new Exception(new StringBuilder(512)
- .append("freqInMs value must be greater than or equal to zero in index(")
- .append(index).append(") of freq limit rule!").toString());
+ .append("the value of FIELD freqInMs must be greater than or equal to 0 ")
+ .append("in freq_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
flowCtrlItems.add(new FlowCtrlItem(typeVal, zeroCnt, freqInMs));
}
-
+ if (flowCtrlItems.isEmpty()) {
+ throw new Exception(new StringBuilder(512)
+ .append("not found valid rule define in freq_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
+ // sort rule set by the value of FIELD zeroCnt
Collections.sort(flowCtrlItems, new Comparator<FlowCtrlItem>() {
@Override
public int compare(final FlowCtrlItem o1, final FlowCtrlItem o2) {
@@ -515,67 +582,105 @@ public class FlowCtrlRuleHandler {
}
/**
- * @param typeVal
- * @param jsonObject
- * @return
+ * Parse low frequent fetch count
+ * @param recordNo record no
+ * @param typeVal type value
+ * @param jsonObject record json value
+ * @return parsed result
* @throws Exception
*/
- private List<FlowCtrlItem> parseLowFetchLimit(int typeVal,
+ private List<FlowCtrlItem> parseLowFetchLimit(int recordNo, int typeVal,
JsonObject jsonObject) throws Exception {
if (jsonObject == null || jsonObject.get("type").getAsInt() != 3) {
- throw new Exception("parse low fetch limit rule failure!");
+ throw new Exception(new StringBuilder(512)
+ .append("parse low_fetch_limit rule failure in record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
+ if (!jsonObject.has("rule")) {
+ throw new Exception(new StringBuilder(512)
+ .append("FIELD rule is required in low_fetch_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
JsonArray ruleArray = jsonObject.get("rule").getAsJsonArray();
if (ruleArray == null) {
- throw new Exception("not found rule list in low fetch limit!");
+ throw new Exception(new StringBuilder(512)
+ .append("emtpy rule define in low_fetch_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
if (ruleArray.size() > 1) {
- throw new Exception("only allow set one rule in low fetch limit!");
+ throw new Exception(new StringBuilder(512)
+ .append("only allow set one rule in low_fetch_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
}
+ int itemNo;
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
+ // parse low_fetch_limit rule record
for (int index = 0; index < ruleArray.size(); index++) {
+ itemNo = index + 1;
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int normfreqInMs = 0;
int filterFreqInMs = 0;
int minDataFilterFreqInMs = 0;
if (ruleObject.has("filterFreqInMs")
|| ruleObject.has("minDataFilterFreqInMs")) {
+ if (!ruleObject.has("filterFreqInMs")) {
+ throw new Exception(new StringBuilder(512)
+ .append("FIELD filterFreqInMs is required ")
+ .append("in low_fetch_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
+ }
filterFreqInMs = ruleObject.get("filterFreqInMs").getAsInt();
if (filterFreqInMs < 0 || filterFreqInMs > 300000) {
throw new Exception(new StringBuilder(512)
- .append("filterFreqInMs value must in [0, 300000] in index(")
- .append(index).append(") of low fetch limit rule!").toString());
+ .append("the value of FIELD filterFreqInMs must in [0, 300000] ")
+ .append("in low_fetch_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (!ruleObject.has("minDataFilterFreqInMs")) {
throw new Exception(new StringBuilder(512)
- .append("minDataFilterFreqInMs key is required in index(")
- .append(index).append(") of low fetch limit rule!").toString());
+ .append("FIELD minDataFilterFreqInMs is required ")
+ .append("in low_fetch_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
minDataFilterFreqInMs = ruleObject.get("minDataFilterFreqInMs").getAsInt();
if (minDataFilterFreqInMs < 0 || minDataFilterFreqInMs > 300000) {
throw new Exception(new StringBuilder(512)
- .append("minDataFilterFreqInMs value must in [0, 300000] in index(")
- .append(index).append(") of low fetch limit rule!").toString());
+ .append("the value of FIELD minDataFilterFreqInMs must in [0, 300000] ")
+ .append("in low_fetch_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (minDataFilterFreqInMs < filterFreqInMs) {
throw new Exception(new StringBuilder(512)
- .append("minDataFilterFreqInMs value must be greater than ")
- .append("or equal to filterFreqInMs value in index(")
- .append(index).append(") of low fetch limit rule!").toString());
+ .append("the value of FIELD minDataFilterFreqInMs must be greater ")
+ .append("than or equal to the value of FIELD filterFreqInMs")
+ .append("in low_fetch_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
}
if (ruleObject.has("normFreqInMs")) {
normfreqInMs = ruleObject.get("normFreqInMs").getAsInt();
if (normfreqInMs < 0 || normfreqInMs > 300000) {
throw new Exception(new StringBuilder(512)
- .append("normFreqInMs value must in [0, 300000] in index(")
- .append(index).append(") of low fetch limit rule!").toString());
+ .append("the value of FIELD normFreqInMs must in [0, 300000] ")
+ .append("in low_fetch_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
}
flowCtrlItems.add(new FlowCtrlItem(typeVal,
normfreqInMs, filterFreqInMs, minDataFilterFreqInMs));
}
-
+ if (flowCtrlItems.isEmpty()) {
+ throw new Exception(new StringBuilder(512)
+ .append("not found valid rule define in low_fetch_limit record(")
+ .append(recordNo).append(") of flowCtrlInfo value!").toString());
+ }
+ // sort rule set by the value of filterFreqInMs
Collections.sort(flowCtrlItems, new Comparator<FlowCtrlItem>() {
@Override
public int compare(final FlowCtrlItem o1, final FlowCtrlItem o2) {
@@ -596,21 +701,31 @@ public class FlowCtrlRuleHandler {
return this.strFlowCtrlInfo;
}
-
/**
- * @param strValName
- * @param strTimeVal
- * @param index
- * @param ruleType
- * @return
+ * Parse time information
+ * @param ruleObject rule value object
+ * @param fieldName field name
+ * @param itemNo rule no
+ * @param recordNo record no
+ * @return parse result
* @throws Exception
*/
- private int validAndGetTimeValue(final String strValName,
- final String strTimeVal,
- int index, final String ruleType) throws Exception {
+ private int validAndGetTimeValue(JsonObject ruleObject, String fieldName,
+ int itemNo, int recordNo) throws Exception {
+ if (!ruleObject.has(fieldName)) {
+ throw new Exception(new StringBuilder(512)
+ .append("FIELD ").append(fieldName).append(" is required ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
+ }
+ String strTimeVal = ruleObject.get(fieldName).getAsString();
if (TStringUtils.isBlank(strTimeVal)) {
- throw new Exception(strValName + " value is null or blank of "
- + ruleType + " limit rule!");
+ throw new Exception(new StringBuilder(512)
+ .append("the value of FIELD ").append(fieldName)
+ .append(" is null or blank in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
int timeHour = 0;
int timeMin = 0;
@@ -618,34 +733,48 @@ public class FlowCtrlRuleHandler {
if ((startItems.length != 2)
|| TStringUtils.isBlank(startItems[0])
|| TStringUtils.isBlank(startItems[1])) {
- throw new Exception("illegal format, " + strValName
- + " value must be 'aa:bb' and 'aa','bb' must be int value format in "
- + ruleType + " limit rule!");
+ throw new Exception(new StringBuilder(512)
+ .append("illegal format, the value of FIELD ").append(fieldName)
+ .append(" must be 'aa:bb' and 'aa','bb' must be int value ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
try {
timeHour = Integer.parseInt(startItems[0]);
} catch (Throwable e2) {
- throw new Exception("illegal format, " + strValName
- + " value must be 'aa:bb' and 'aa' must be int value in "
- + ruleType + " limit rule!");
+ throw new Exception(new StringBuilder(512)
+ .append("illegal format, the value of FIELD ").append(fieldName)
+ .append(" must be 'aa:bb' and 'aa' must be int value ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
try {
timeMin = Integer.parseInt(startItems[1]);
} catch (Throwable e2) {
- throw new Exception("illegal format, " + strValName
- + " value must be 'aa:bb' and 'bb' must be int value in "
- + ruleType + " limit rule!");
+ throw new Exception(new StringBuilder(512)
+ .append("illegal format, the value of FIELD ").append(fieldName)
+ .append(" must be 'aa:bb' and 'bb' must be int value ")
+ .append("in data_limit item(").append(itemNo)
+ .append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (timeHour < 0 || timeHour > 24) {
throw new Exception(new StringBuilder(512)
- .append(strValName).append("-hour value must in [0,23] in index(")
- .append(index).append(") of ").append(ruleType).append(" limit rule!").toString());
+ .append("illegal value, the value of FIELD ").append(fieldName)
+ .append("-hour value must in [0,23] in data_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
if (timeMin < 0 || timeMin > 59) {
throw new Exception(new StringBuilder(512)
- .append(strValName).append("-minute value must in [0,59] in index(")
- .append(index).append(") of ").append(ruleType).append(" limit rule!").toString());
+ .append("illegal value, the value of FIELD ").append(fieldName)
+ .append("-minute value must in [0,59] in data_limit item(")
+ .append(itemNo).append(").record(").append(recordNo)
+ .append(") of flowCtrlInfo value!").toString());
}
return timeHour * 100 + timeMin;
}
+
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
index 5fae646..b3146db 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/TStringUtils.java
@@ -188,7 +188,11 @@ public class TStringUtils {
if (isNotBlank(attrItem)) {
String[] kv = attrItem.split(TokenConstants.EQ);
if (attrKey.equals(kv[0])) {
- return kv[1];
+ if (kv.length == 1) {
+ return "";
+ } else {
+ return kv[1];
+ }
}
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
index c73b4ea..aff3b6e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
@@ -190,8 +190,9 @@ public enum WebFieldDef {
ALWDBCRATE(65, "alwdBrokerClientRate", "abcr", WebFieldType.INT,
"Allowed broker client rate", RegexDef.TMP_NUMBER),
- REASON(66, "reason", "rsn", WebFieldType.STRING,
- "Reason", TBaseConstants.META_MAX_OPREASON_LENGTH, RegexDef.TMP_STRING),
+ DSBCSMREASON(66, "disableCsmRsn", "dsCsmRsn", WebFieldType.STRING,
+ "Reasons for disable consumption",
+ TBaseConstants.META_MAX_OPREASON_LENGTH, RegexDef.TMP_STRING),
FILTERENABLE(67, "filterEnable", "fltEn",
WebFieldType.BOOLEAN, "Filter consume enable status"),
MANAGESTATUS(68, "manageStatus", "mSts",
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
index 0c6b066..a2fe0be 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
@@ -23,7 +23,6 @@ import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index 98878d5..a74f1c9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -985,8 +985,8 @@ public class MetaDataManager implements Server {
return new TopicProcessResult(deployEntity.getBrokerId(), "", result);
}
// add topic control configure
- if (!addIfAbsentTopicCtrlConf(deployEntity.getTopicName(),
- deployEntity.getModifyUser(), sBuffer, result)) {
+ if (!addIfAbsentTopicCtrlConf(deployEntity,
+ deployEntity.getTopicName(), sBuffer, result)) {
return new TopicProcessResult(deployEntity.getBrokerId(),
deployEntity.getTopicName(), result);
}
@@ -1489,13 +1489,13 @@ public class MetaDataManager implements Server {
/**
* Add if absent topic control configure info
*
+ * @param opEntity the operation info
* @param topicName the topic name will be add
- * @param operator the topic name id will be add
* @param sBuffer the print info string buffer
* @param result the process result return
* @return true if success otherwise false
*/
- public boolean addIfAbsentTopicCtrlConf(String topicName, String operator,
+ public boolean addIfAbsentTopicCtrlConf(BaseEntity opEntity, String topicName,
StringBuilder sBuffer, ProcessResult result) {
int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
ClusterSettingEntity defSetting = getClusterDefSetting(false);
@@ -1505,8 +1505,8 @@ public class MetaDataManager implements Server {
TopicCtrlEntity curEntity =
metaStoreService.getTopicCtrlConf(topicName);
if (curEntity == null) {
- curEntity = new TopicCtrlEntity(topicName,
- TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB, operator);
+ curEntity = new TopicCtrlEntity(opEntity, topicName,
+ TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB);
metaStoreService.addTopicCtrlConf(curEntity, sBuffer, result);
} else {
result.setSuccResult(null);
@@ -1799,8 +1799,7 @@ public class MetaDataManager implements Server {
if (!addIfAbsentGroupResConf(entity, entity.getGroupName(), sBuffer, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
- if (!addIfAbsentTopicCtrlConf(entity.getTopicName(),
- entity.getModifyUser(), sBuffer, result)) {
+ if (!addIfAbsentTopicCtrlConf(entity, entity.getTopicName(), sBuffer, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
GroupConsumeCtrlEntity curEntity =
@@ -1876,8 +1875,7 @@ public class MetaDataManager implements Server {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
// add topic control record
- if (!addIfAbsentTopicCtrlConf(entity.getTopicName(),
- entity.getModifyUser(), sBuffer, result)) {
+ if (!addIfAbsentTopicCtrlConf(entity, entity.getTopicName(), sBuffer, result)) {
return new GroupProcessResult(entity.getGroupName(), entity.getTopicName(), result);
}
GroupConsumeCtrlEntity newEntity;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index 791c7e6..8c0897b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -48,8 +48,7 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
public GroupConsumeCtrlEntity(BaseEntity opInfoEntity,
String groupName, String topicName) {
super(opInfoEntity);
- this.groupName = groupName;
- this.topicName = topicName;
+ setGroupAndTopic(groupName, topicName);
}
/**
@@ -261,14 +260,14 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
sBuilder.append("{\"topicName\":\"").append(topicName).append("\"")
.append(",\"groupName\":\"").append(groupName).append("\"")
.append(",\"consumeEnable\":").append(consumeEnable.isEnable())
- .append(",\"disableReason\":\"").append(disableReason).append("\"")
+ .append(",\"disableCsmRsn\":\"").append(disableReason).append("\"")
.append(",\"filterEnable\":").append(filterEnable.isEnable())
.append(",\"filterConds\":\"").append(tmpFilterConds).append("\"");
} else {
sBuilder.append("{\"topic\":\"").append(topicName).append("\"")
.append(",\"group\":\"").append(groupName).append("\"")
.append(",\"csmEn\":").append(consumeEnable.isEnable())
- .append(",\"dsRsn\":\"").append(disableReason).append("\"")
+ .append(",\"dsCsmRsn\":\"").append(disableReason).append("\"")
.append(",\"fltEn\":").append(filterEnable.isEnable())
.append(",\"fltRls\":\"").append(tmpFilterConds).append("\"");
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 2a9d0d0..74986e8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -17,7 +17,6 @@
package org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity;
-import java.util.Date;
import java.util.Objects;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils;
@@ -44,10 +43,16 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
super();
}
+ public TopicCtrlEntity(BaseEntity opEntity, String topicName) {
+ super(opEntity);
+ this.topicName = topicName;
+ }
- public TopicCtrlEntity(String topicName, int topicNameId,
- int maxMsgSizeInMB, String createUser) {
- super(createUser, new Date());
+ public TopicCtrlEntity(BaseEntity opEntity, String topicName,
+ int topicNameId, int maxMsgSizeInMB) {
+ super(opEntity.getDataVerId(),
+ opEntity.getModifyUser(),
+ opEntity.getModifyDate());
this.topicName = topicName;
this.topicNameId = topicNameId;
this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
@@ -56,11 +61,6 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable {
this.maxMsgSizeInMB = maxMsgSizeInMB;
}
- public TopicCtrlEntity(BaseEntity opEntity, String topicName) {
- super(opEntity);
- this.topicName = topicName;
- }
-
/**
* Constructor by BdbTopicAuthControlEntity
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 4bc5c5b..c9af1bc 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -211,9 +211,24 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> brokerIdSet,
Set<String> brokerIpSet,
BrokerConfEntity qryEntity) {
+ Set<Integer> idHitSet = null;
Set<Integer> ipHitSet = null;
Set<Integer> totalMatchedSet = null;
Map<Integer, BrokerConfEntity> retMap = new HashMap<>();
+ // get records set by brokerIdSet
+ if (brokerIdSet != null && !brokerIdSet.isEmpty()) {
+ idHitSet = new HashSet<>();
+ BrokerConfEntity entity;
+ for (Integer brokerId : brokerIdSet) {
+ entity = brokerConfCache.get(brokerId);
+ if (entity != null) {
+ idHitSet.add(brokerId);
+ }
+ }
+ if (idHitSet.isEmpty()) {
+ return retMap;
+ }
+ }
// get records set by brokerIpSet
if (brokerIpSet != null && !brokerIpSet.isEmpty()) {
ipHitSet = new HashSet<>();
@@ -228,15 +243,15 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
}
}
// get intersection from brokerIdSet and brokerIpSet
- if (brokerIdSet != null || ipHitSet != null) {
- if (brokerIdSet == null) {
+ if (idHitSet != null || ipHitSet != null) {
+ if (idHitSet == null) {
totalMatchedSet = new HashSet<>(ipHitSet);
} else {
if (ipHitSet == null) {
- totalMatchedSet = new HashSet<>(brokerIdSet);
+ totalMatchedSet = new HashSet<>(idHitSet);
} else {
totalMatchedSet = new HashSet<>();
- for (Integer record : brokerIdSet) {
+ for (Integer record : idHitSet) {
if (ipHitSet.contains(record)) {
totalMatchedSet.add(record);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
index 87c6fbb..bab4742 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
-import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.common.exception.LoadMetaException;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index d0b5aba..b137a69 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Deprecated
public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
private static final Logger logger =
@@ -201,14 +202,14 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
sBuffer.append("{\"topicName\":\"").append(entry.getTopicName())
.append("\",\"groupName\":\"").append(entry.getGroupName())
- .append(",\"dataVersionId\":").append(entry.getDataVerId())
- .append(",\"createUser\":\"").append(entry.getCreateUser()).append("\"")
- .append(",\"createDate\":\"").append(entry.getCreateDateStr()).append("\"")
- .append(",\"modifyUser\":\"").append(entry.getModifyUser()).append("\"")
- .append(",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
+ .append("\",\"dataVersionId\":").append(entry.getDataVerId())
+ .append(",\"createUser\":\"").append(entry.getCreateUser())
+ .append("\",\"createDate\":\"").append(entry.getCreateDateStr())
+ .append("\",\"modifyUser\":\"").append(entry.getModifyUser())
+ .append("\",\"modifyDate\":\"").append(entry.getModifyDateStr()).append("\"}");
}
- WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
}
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
return sBuffer;
}
@@ -860,8 +861,8 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
if (totalCnt++ > 0) {
sBuffer.append(",");
}
- sBuffer.append("{\"groupName\":\"").append(entry.getGroupName()).append("\"")
- .append(",\"success\":").append(entry.isSuccess())
+ sBuffer.append("{\"groupName\":\"").append(entry.getGroupName())
+ .append("\",\"success\":").append(entry.isSuccess())
.append(",\"errCode\":").append(entry.getErrCode())
.append(",\"errInfo\":\"").append(entry.getErrInfo()).append("\"}");
}
@@ -1227,14 +1228,15 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
}
int paramValue = (int) result.getRetData();
if (paramValue == TBaseConstants.META_VALUE_UNDEFINED) {
- return defValue;
+ result.setSuccResult(defValue);
} else {
if (paramValue == 2) {
- return Boolean.TRUE;
+ result.setSuccResult(Boolean.TRUE);
} else {
- return Boolean.FALSE;
+ result.setSuccResult(Boolean.FALSE);
}
}
+ return result.isSuccess();
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
index 5295811..9db1429 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupConsumeCtrlHandler.java
@@ -265,9 +265,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
Boolean consumeEnable = (Boolean) result.getRetData();
- // get disableReason list
+ // get disableCsmRsn info
if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.REASON, false,
+ WebFieldDef.DSBCSMREASON, false,
(isAddOp ? "" : null), sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
@@ -386,9 +386,9 @@ public class WebGroupConsumeCtrlHandler extends AbstractWebHandler {
return result.isSuccess();
}
final Boolean consumeEnable = (Boolean) result.getRetData();
- // get disableReason list
+ // get disableCsmRsn info
if (!WebParameterUtils.getStringParamValue(itemsMap,
- WebFieldDef.REASON, false, (isAddOp ? "" : null), sBuffer, result)) {
+ WebFieldDef.DSBCSMREASON, false, (isAddOp ? "" : null), sBuffer, result)) {
return result.isSuccess();
}
final String disableRsn = (String) result.getRetData();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
index 10e664f..b4536e1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebGroupResCtrlHandler.java
@@ -71,9 +71,9 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
StringBuilder sBuffer,
ProcessResult result) {
// build query entity
- GroupResCtrlEntity entity = new GroupResCtrlEntity();
+ GroupResCtrlEntity qryEntity = new GroupResCtrlEntity();
// get queried operation info, for createUser, modifyUser, dataVersionId
- if (!WebParameterUtils.getQueriedOperateInfo(req, entity, sBuffer, result)) {
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, result)) {
WebParameterUtils.buildFailResult(sBuffer, result.errInfo);
return sBuffer;
}
@@ -106,16 +106,16 @@ public class WebGroupResCtrlHandler extends AbstractWebHandler {
return sBuffer;
}
Boolean flowCtrlEnable = (Boolean) result.getRetData();
- entity.updModifyInfo(entity.getDataVerId(),
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(),
resCheckEnable, TBaseConstants.META_VALUE_UNDEFINED, inQryPriorityId,
flowCtrlEnable, TBaseConstants.META_VALUE_UNDEFINED, null);
Map<String, GroupResCtrlEntity> groupResCtrlEntityMap =
- metaDataManager.confGetGroupResCtrlConf(inGroupSet, entity);
+ metaDataManager.confGetGroupResCtrlConf(inGroupSet, qryEntity);
// build return result
int totalCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
- for (GroupResCtrlEntity resCtrlEntity : groupResCtrlEntityMap.values()) {
- if (resCtrlEntity == null) {
+ for (GroupResCtrlEntity entity : groupResCtrlEntityMap.values()) {
+ if (entity == null) {
continue;
}
if (totalCnt++ > 0) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 7f1ab75..9dfbf41 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -747,9 +747,9 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
if (countJ++ > 0) {
sBuffer.append(",");
}
- groupEntity.toWebJsonStr(sBuffer, true, false);
+ groupEntity.toWebJsonStr(sBuffer, true, true);
}
- sBuffer.append("],\"groupCount\":").append(countJ);
+ sBuffer.append("],\"groupAuthCount\":").append(countJ);
}
sBuffer.append("}");
}