You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/03/08 09:03:04 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-571] Adjust
WebOtherInfoHandler class implementation
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 4b61019 [TUBEMQ-571] Adjust WebOtherInfoHandler class implementation
4b61019 is described below
commit 4b6101993c4131a1a2ad2fdd32d70e76e7f7fcb9
Author: gosonzhang <go...@tencent.com>
AuthorDate: Sat Mar 6 20:44:10 2021 +0800
[TUBEMQ-571] Adjust WebOtherInfoHandler class implementation
---
.../org/apache/tubemq/corebase/utils/Tuple2.java | 11 ++
.../tubemq/server/common/fielddef/WebFieldDef.java | 72 ++++++++-
.../common/{webbase => fielddef}/WebFieldType.java | 7 +-
.../server/common/utils/WebParameterUtils.java | 9 ++
.../org/apache/tubemq/server/master/TMaster.java | 2 +-
.../nodemanage/nodebroker/TopicPSInfoManager.java | 66 ++++++++
.../nodemanage/nodeconsumer/ConsumerBandInfo.java | 12 +-
.../nodeconsumer/ConsumerInfoHolder.java | 76 +++++++--
.../server/master/web/action/screen/Master.java | 14 +-
.../master/web/handler/WebOtherInfoHandler.java | 179 ++++++++++-----------
.../nodebroker/TopicPSInfoManagerTest.java | 2 +-
11 files changed, 331 insertions(+), 119 deletions(-)
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
index 048452f..c9f16e5 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
@@ -58,4 +58,15 @@ public class Tuple2<T0, T1> {
public T1 getF1() {
return f1;
}
+
+ /**
+ * Set all field values
+ *
+ * @param value0 The value for field 0
+ * @param value1 The value for field 1
+ */
+ public void setF0AndF1(T0 value0, T1 value1) {
+ this.f0 = value0;
+ this.f1 = value1;
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 68a833b..ba6a10f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -21,7 +21,6 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.RegexDef;
import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.webbase.WebFieldType;
public enum WebFieldDef {
@@ -44,6 +43,7 @@ public enum WebFieldDef {
MODIFYUSER(4, "modifyUser", "mur", WebFieldType.STRING,
"Record modifier", TBaseConstants.META_MAX_USERNAME_LENGTH,
RegexDef.TMP_STRING),
+
MANUALOFFSET(5, "manualOffset", "offset", WebFieldType.LONG,
"Reset offset value", RegexDef.TMP_NUMBER),
MSGCOUNT(6, "msgCount", "cnt", WebFieldType.INT,
@@ -55,6 +55,7 @@ public enum WebFieldDef {
"Require return disk offset details"),
NEEDREFRESH(9, "needRefresh", "nrf", WebFieldType.BOOLEAN,
"Require refresh data"),
+
COMPSGROUPNAME(10, "groupName", "group", WebFieldType.COMPSTRING,
"Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
RegexDef.TMP_GROUP),
@@ -67,6 +68,7 @@ public enum WebFieldDef {
"Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
BROKERID(14, "brokerId", "brokerId", WebFieldType.INT,
"Broker ID", RegexDef.TMP_NUMBER),
+
COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
"Broker ID", RegexDef.TMP_NUMBER),
WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
@@ -79,10 +81,11 @@ public enum WebFieldDef {
TGTCOMPSGROUPNAME(19, "targetGroupName", "tgtGroup",
WebFieldType.COMPSTRING, "Offset clone target group name",
TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
+
MANUALSET(20, "manualSet", "manSet",
WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
OFFSETJSON(21, "offsetJsonInfo", "offsetInfo",
- WebFieldType.JSONTYPE, "The offset info that needs to be added or modified"),
+ WebFieldType.JSONDICT, "The offset info that needs to be added or modified"),
ONLYMEM(22, "onlyMemory", "onlyMem", WebFieldType.BOOLEAN,
"Only clear the offset data in the memory cache, default is false"),
ADMINAUTHTOKEN(23, "confModAuthToken", "authToken", WebFieldType.STRING,
@@ -90,6 +93,7 @@ public enum WebFieldDef {
TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH),
MAXMSGSIZE(24, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
"Max allowed message size, unit MB", RegexDef.TMP_NUMBER),
+
CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
"Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING,
@@ -97,12 +101,73 @@ public enum WebFieldDef {
HOSTNAME(27, "hostName", "hostName", WebFieldType.STRING,
"Host name information", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
CLIENTID(28, "clientId", "clientId", WebFieldType.STRING,
- "Client ID", TBaseConstants.META_MAX_CLIENT_ID_LENGTH);
+ "Client ID", TBaseConstants.META_MAX_CLIENT_ID_LENGTH),
+ @Deprecated
+ CONSUMEGROUP(29, "consumeGroup", "group", WebFieldType.STRING,
+ "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+ RegexDef.TMP_GROUP),
+
+ @Deprecated
+ COMPSCONSUMEGROUP(30, "consumeGroup", "group", WebFieldType.COMPSTRING,
+ "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+ RegexDef.TMP_GROUP),
+ REGIONID(31, "regionId", "regionId", WebFieldType.INT,
+ "Region id", RegexDef.TMP_NUMBER),
+ COMPREGIONID(32, "regionId", "regionId", WebFieldType.COMPINT,
+ "Region id", RegexDef.TMP_NUMBER),
+ DATAVERSIONID(33, "dataVersionId", "dataVerId", WebFieldType.LONG,
+ "Data version id", RegexDef.TMP_NUMBER),
+ TOPICNAMEID(34, "topicNameId", "topicId", WebFieldType.LONG,
+ "Topic name id", RegexDef.TMP_NUMBER),
+ COMPTOPICNAMEID(35, "topicNameId", "topicId", WebFieldType.COMPLONG,
+ "Topic name id", RegexDef.TMP_NUMBER),
+ NUMTOPICSTORES(36, "numTopicStores", "numStore", WebFieldType.INT,
+ "Number of topic stores", RegexDef.TMP_NUMBER),
+ NUMPARTITIONS(37, "numPartitions", "numPart", WebFieldType.INT,
+ "Number of partitions", RegexDef.TMP_NUMBER),
+ UNFLUSHTHRESHOLD(38, "unflushThreshold", "unfDskMsgCnt", WebFieldType.INT,
+ "Maximum allowed disk unflushing message count", RegexDef.TMP_NUMBER),
+ UNFLUSHINTERVAL(39, "unflushInterval", "unfDskInt", WebFieldType.INT,
+ "Maximum allowed disk unflushing interval", RegexDef.TMP_NUMBER),
+ UNFLUSHDATAHOLD(40, "unflushDataHold", "unfDskDataSize", WebFieldType.INT,
+ "Maximum allowed disk unflushing data size", RegexDef.TMP_NUMBER),
+ MCACHESIZEINMB(41, "memCacheMsgSizeInMB", "cacheSizeInMB", WebFieldType.INT,
+ "Maximum allowed memory cache size in MB", RegexDef.TMP_NUMBER),
+ UNFMCACHECNTINK(42, "memCacheMsgCntInK", "unfMemMsgCnt", WebFieldType.INT,
+ "Maximum allowed memory cache unflushing message count", RegexDef.TMP_NUMBER),
+ UNFMCACHEINTERVAL(43, "memCacheFlushIntvl", "unfMemInt", WebFieldType.INT,
+ "Maximum allowed disk unflushing data size", RegexDef.TMP_NUMBER),
+ MAXMSGSIZEINMB(44, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
+ "Maximum allowed message length", RegexDef.TMP_NUMBER),
+ ACCEPTPUBLISH(45, "acceptPublish", "accPub", WebFieldType.BOOLEAN,
+ "Enable publishing"),
+ ACCEPTSUBSCRIBE(46, "acceptSubscribe", "accSub", WebFieldType.BOOLEAN,
+ "Enable subscription"),
+ DELETEPOLICY(47, "deletePolicy", "delPolicy",
+ WebFieldType.DELPOLICY, "File aging strategy"),
+ TOPICJSONSET(48, "topicJsonSet", "topicSet",
+ WebFieldType.JSONSET, "The topic info set that needs to be added or modified"),
+ BROKERIP(49, "brokerIp", "brokerIp", WebFieldType.STRING,
+ "Broker ip", TBaseConstants.META_MAX_BROKER_IP_LENGTH,
+ RegexDef.TMP_IPV4ADDRESS),
+ BROKERPORT(50, "brokerPort", "brokerPort", WebFieldType.INT,
+ "Broker port", RegexDef.TMP_NUMBER),
+ BROKERTLSPORT(51, "brokerTLSPort", "brokerTLSPort", WebFieldType.INT,
+ "Broker tls port", RegexDef.TMP_NUMBER),
+ BROKERJSONSET(52, "brokerJsonSet", "brokerSet",
+ WebFieldType.JSONSET, "The broker info set that needs to be added or modified"),
+ STATUSID(53, "statusId", "statusId", WebFieldType.INT,
+ "Status id", RegexDef.TMP_NUMBER),
+ QRYPRIORITYID(54, "qryPriorityId", "qryPriId", WebFieldType.INT,
+ "Query priority id", RegexDef.TMP_NUMBER),
+ FLOWCTRLSET(55, "flowCtrlInfo", "flowCtrlSet",
+ WebFieldType.JSONSET,
+ "The flow control info set that needs to be added or modified");
@@ -174,6 +239,7 @@ public enum WebFieldDef {
public boolean isCompFieldType() {
return (this.type == WebFieldType.COMPINT
+ || this.type == WebFieldType.COMPLONG
|| this.type == WebFieldType.COMPSTRING);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldType.java
similarity index 86%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldType.java
index a3e037f..7109d9e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.tubemq.server.common.webbase;
+package org.apache.tubemq.server.common.fielddef;
@@ -29,7 +29,10 @@ public enum WebFieldType {
DATE(5, "Date"),
COMPSTRING(6, "Compound string"),
COMPINT(7, "Compound integer"),
- JSONTYPE(8, "Json");
+ COMPLONG(8, "Compound long"),
+ JSONDICT(9, "Json dict"),
+ JSONSET(10, "Json set"),
+ DELPOLICY(11, "Delete policy");
private int value;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 161966d..2747efd 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -251,6 +251,15 @@ public class WebParameterUtils {
return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"").
append(appendInfo).append("\"}");
}
+
+ public static StringBuilder buildSuccessWithDataRetBegin(StringBuilder strBuffer) {
+ return strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+ }
+ public static StringBuilder buildSuccessWithDataRetEnd(
+ StringBuilder strBuffer, int totalCnt) {
+ return strBuffer.append("],\"count\":").append(totalCnt).append("}");
+ }
+
/**
* Parse the parameter value from an object value to a long value
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index e7327cb..de510a6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -176,7 +176,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.producerHolder = new ProducerInfoHolder();
this.consumerHolder = new ConsumerInfoHolder();
this.consumerEventManager = new ConsumerEventManager(consumerHolder);
- this.topicPSInfoManager = new TopicPSInfoManager();
+ this.topicPSInfoManager = new TopicPSInfoManager(this);
this.loadBalancer = new DefaultLoadBalancer();
this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
false, TBaseConstants.META_VALUE_UNDEFINED);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 2b0ffc9..19371a1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -29,12 +29,15 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
/**
* Topic Publication/Subscription info management
*/
public class TopicPSInfoManager {
+ private final TMaster master;
private final ConcurrentHashMap<String/* topic */,
ConcurrentHashMap<BrokerInfo, TopicInfo>> brokerPubInfoMap =
new ConcurrentHashMap<>();
@@ -45,6 +48,10 @@ public class TopicPSInfoManager {
ConcurrentHashSet<String/* group */>> topicSubInfoMap =
new ConcurrentHashMap<>();
+ public TopicPSInfoManager(TMaster master) {
+ this.master = master;
+ }
+
/**
* Get groups according to topic
*
@@ -283,4 +290,63 @@ public class TopicPSInfoManager {
topicPubInfoMap.clear();
topicSubInfoMap.clear();
}
+
+ /**
+ * Get the set of online groups subscribed to the specified topic.
+ * If the specified query consumer group is empty, then the full amount of
+ * online consumer groups will be taken; if the specified subscription topic
+ * is empty, then all online consumer groups will be taken.
+ *
+ * @param qryGroupSet
+ * @param subTopicSet
+ * @return online groups
+ */
+ public Set<String> getGroupSetWithSubTopic(Set<String> qryGroupSet,
+ Set<String> subTopicSet) {
+ Set<String> resultSet = new HashSet<>();
+ if (subTopicSet.isEmpty()) {
+ // get all online group
+ ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+ List<String> onlineGroups = consumerHolder.getAllGroup();
+ if (!onlineGroups.isEmpty()) {
+ if (qryGroupSet.isEmpty()) {
+ resultSet.addAll(onlineGroups);
+ } else {
+ for (String group : qryGroupSet) {
+ if (onlineGroups.contains(group)) {
+ resultSet.add(group);
+ }
+ }
+ }
+ }
+ } else {
+ // filter subscribed online group
+ Set<String> tmpGroupSet;
+ if (qryGroupSet.isEmpty()) {
+ for (String topic : subTopicSet) {
+ tmpGroupSet = topicSubInfoMap.get(topic);
+ if (tmpGroupSet != null && !tmpGroupSet.isEmpty()) {
+ resultSet.addAll(tmpGroupSet);
+ }
+ }
+ } else {
+ for (String topic : subTopicSet) {
+ tmpGroupSet = topicSubInfoMap.get(topic);
+ if (tmpGroupSet == null || tmpGroupSet.isEmpty()) {
+ continue;
+ }
+ for (String group : qryGroupSet) {
+ if (tmpGroupSet.contains(group)) {
+ resultSet.add(group);
+ }
+ }
+ qryGroupSet.removeAll(resultSet);
+ if (qryGroupSet.isEmpty()) {
+ break;
+ }
+ }
+ }
+ }
+ return resultSet;
+ }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
index 97e9f89..b925ddf 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
@@ -446,11 +446,21 @@ public class ConsumerBandInfo {
}
public List<ConsumerInfo> getConsumerInfoList() {
- List<ConsumerInfo> result = new ArrayList<ConsumerInfo>();
+ List<ConsumerInfo> result = new ArrayList<>();
result.addAll(this.consumerInfoMap.values());
return result;
}
+ public List<ConsumerInfo> cloneConsumerInfoList() {
+ List<ConsumerInfo> result = new ArrayList<>();
+ for (ConsumerInfo consumer : this.consumerInfoMap.values()) {
+ if (consumer != null) {
+ result.add(consumer.clone());
+ }
+ }
+ return result;
+ }
+
public ConsumerInfo getConsumerInfo(String consumerId) {
return consumerInfoMap.get(consumerId);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index fc5d932..d8a182e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -19,7 +19,6 @@ package org.apache.tubemq.server.master.nodemanage.nodeconsumer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -217,20 +216,73 @@ public class ConsumerInfoHolder {
}
public Set<String> getGroupTopicSet(String group) {
- if (group == null) {
- return new HashSet<>();
+ if (group != null) {
+ try {
+ rwLock.readLock().lock();
+ ConsumerBandInfo oldConsumeBandInfo =
+ groupInfoMap.get(group);
+ if (oldConsumeBandInfo != null) {
+ return oldConsumeBandInfo.getTopicSet();
+ }
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- return oldConsumeBandInfo.getTopicSet();
+ return Collections.emptySet();
+ }
+
+ /**
+ * Get the group's subscribed topics and ConsumerInfos.
+ *
+ * @param group, group to be queried
+ * @param result, query result, only read
+ * @return Has the data been found
+ */
+ public boolean getGroupTopicSetAndConsumerInfos(
+ String group, Tuple2<Set<String>, List<ConsumerInfo>> result) {
+ if (group != null) {
+ try {
+ rwLock.readLock().lock();
+ ConsumerBandInfo curGroupInfo =
+ groupInfoMap.get(group);
+ if (curGroupInfo != null) {
+ result.setF0AndF1(curGroupInfo.getTopicSet(),
+ curGroupInfo.cloneConsumerInfoList());
+ return true;
+ }
+ } finally {
+ rwLock.readLock().unlock();
}
- } finally {
- rwLock.readLock().unlock();
}
- return new HashSet<>();
+ result.setF0AndF1(Collections.emptySet(), Collections.emptyList());
+ return false;
+ }
+
+ /**
+ * Get the group's subscribed topics and client count.
+ *
+ * @param group, group to be queried
+ * @param result, query result, only read
+ * @return Has the data been found
+ */
+ public boolean getGroupTopicSetAndClientCnt(String group,
+ Tuple2<Set<String>, Integer> result) {
+ if (group != null) {
+ try {
+ rwLock.readLock().lock();
+ ConsumerBandInfo curGroupInfo =
+ groupInfoMap.get(group);
+ if (curGroupInfo != null) {
+ result.setF0AndF1(curGroupInfo.getTopicSet(),
+ curGroupInfo.getGroupCnt());
+ return true;
+ }
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+ result.setF0AndF1(Collections.emptySet(), 0);
+ return false;
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
index 60dd31b..502a2e2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
@@ -32,6 +32,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corerpc.exception.StandbyException;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
@@ -298,13 +299,16 @@ public class Master implements Action {
TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
master.getCurrentSubInfoMap();
+ int currPartSize = 0;
List<String> groupList = consumerHolder.getAllGroup();
+ Tuple2<Set<String>, List<ConsumerInfo>> queryInfo = new Tuple2<>();
for (String group : groupList) {
- Set<String> topicSet = consumerHolder.getGroupTopicSet(group);
- for (String topic : topicSet) {
- int currPartSize = 0;
- List<ConsumerInfo> consumerList = consumerHolder.getConsumerList(group);
- for (ConsumerInfo consumer : consumerList) {
+ if (!consumerHolder.getGroupTopicSetAndConsumerInfos(group, queryInfo)) {
+ continue;
+ }
+ for (String topic : queryInfo.getF0()) {
+ currPartSize = 0;
+ for (ConsumerInfo consumer : queryInfo.getF1()) {
Map<String, Map<String, Partition>> consumerSubInfoMap =
currentSubInfoMap.get(consumer.getConsumerId());
if (consumerSubInfoMap != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index 2e7e993..214bfce 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -27,10 +27,11 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.tubemq.corebase.cluster.Partition;
-import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
@@ -67,75 +68,56 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
* @return
*/
public StringBuilder getSubscribeInfo(HttpServletRequest req) {
- StringBuilder strBuffer = new StringBuilder();
- try {
- String strConsumeGroup = WebParameterUtils.validGroupParameter("consumeGroup",
- req.getParameter("consumeGroup"), TBaseConstants.META_MAX_GROUPNAME_LENGTH, false, "");
- String strTopicName = WebParameterUtils.validStringParameter("topicName",
- req.getParameter("topicName"), TBaseConstants.META_MAX_TOPICNAME_LENGTH, false, "");
- ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
- List<String> queryGroupSet = new ArrayList<>();
- if (TStringUtils.isEmpty(strTopicName)) {
- List<String> tmpGroupSet = consumerHolder.getAllGroup();
- if (!tmpGroupSet.isEmpty()) {
- if (TStringUtils.isEmpty(strConsumeGroup)) {
- for (String tmpGroup : tmpGroupSet) {
- if (tmpGroup != null) {
- queryGroupSet.add(tmpGroup);
- }
- }
- } else {
- if (tmpGroupSet.contains(strConsumeGroup)) {
- queryGroupSet.add(strConsumeGroup);
- }
- }
- }
- } else {
- Set<String> groupSet = topicPSInfoManager.getTopicSubInfo(strTopicName);
- if ((groupSet != null) && (!groupSet.isEmpty())) {
- if (TStringUtils.isEmpty(strConsumeGroup)) {
- for (String tmpGroup : groupSet) {
- if (tmpGroup != null) {
- queryGroupSet.add(tmpGroup);
- }
- }
- } else {
- if (groupSet.contains(strConsumeGroup)) {
- queryGroupSet.add(strConsumeGroup);
- }
- }
- }
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(1024);
+ // get group list
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSGROUPNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> inGroupNameSet = (Set<String>) result.retData1;
+ if (inGroupNameSet.isEmpty()) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSCONSUMEGROUP, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
}
- if (queryGroupSet.isEmpty()) {
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":0,\"data\":[]}");
- } else {
- int totalCnt = 0;
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"count\":")
- .append(queryGroupSet.size()).append(",\"data\":[");
- for (String tmpGroup : queryGroupSet) {
- Set<String> topicSet = consumerHolder.getGroupTopicSet(tmpGroup);
- final int consuemrCnt = consumerHolder.getConsumerCnt(tmpGroup);
- if (totalCnt++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("{\"consumeGroup\":\"").append(tmpGroup).append("\",\"topicSet\":[");
- int topicCnt = 0;
- for (String tmpTopic : topicSet) {
- if (topicCnt++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("\"").append(tmpTopic).append("\"");
- }
- strBuffer.append("],\"consumerNum\":").append(consuemrCnt).append("}");
+ inGroupNameSet = (Set<String>) result.retData1;
+ }
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ Set<String> queryGroupSet =
+ topicPSInfoManager.getGroupSetWithSubTopic(inGroupNameSet, topicNameSet);
+ int totalCnt = 0;
+ int topicCnt = 0;
+ Tuple2<Set<String>, Integer> queryInfo = new Tuple2<>();
+ ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
+ for (String group : queryGroupSet) {
+ if (!consumerHolder.getGroupTopicSetAndClientCnt(group, queryInfo)) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append("{\"consumeGroup\":\"").append(group).append("\",\"topicSet\":[");
+ topicCnt = 0;
+ for (String tmpTopic : queryInfo.getF0()) {
+ if (topicCnt++ > 0) {
+ sBuilder.append(",");
}
- strBuffer.append("]}");
+ sBuilder.append("\"").append(tmpTopic).append("\"");
}
- } catch (Throwable e) {
- strBuffer.append("{\"result\":false,\"errCode\":500,\"errMsg\":\"Exception on process")
- .append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
+ sBuilder.append("],\"consumerNum\":").append(queryInfo.getF1()).append("}");
}
- return strBuffer;
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, totalCnt);
+ return sBuilder;
}
/**
@@ -146,10 +128,19 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
*/
// #lizard forgives
public StringBuilder getConsumeGroupDetailInfo(HttpServletRequest req) {
- StringBuilder strBuffer = new StringBuilder(1024);
+ ProcessResult result = new ProcessResult();
+ StringBuilder sBuilder = new StringBuilder(1024);
+ // get group name
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, null, result)) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.CONSUMEGROUP, true, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ }
+ String strConsumeGroup = (String) result.retData1;
try {
- String strConsumeGroup = WebParameterUtils.validGroupParameter("consumeGroup",
- req.getParameter("consumeGroup"), TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, "");
boolean isBandConsume = false;
boolean isNotAllocate = false;
boolean isSelectBig = true;
@@ -194,78 +185,78 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
rebalanceCheckTime = consumerBandInfo.getCurCheckCycle();
}
}
- strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
+ sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
.append(",\"count\":").append(consumerList.size()).append(",\"topicSet\":[");
int itemCnt = 0;
for (String topicItem : topicSet) {
if (itemCnt++ > 0) {
- strBuffer.append(",");
+ sBuilder.append(",");
}
- strBuffer.append("\"").append(topicItem).append("\"");
+ sBuilder.append("\"").append(topicItem).append("\"");
}
- strBuffer.append("],\"consumeGroup\":\"").append(strConsumeGroup).append("\",\"re-rebalance\":{");
+ sBuilder.append("],\"consumeGroup\":\"").append(strConsumeGroup).append("\",\"re-rebalance\":{");
itemCnt = 0;
for (Map.Entry<String, NodeRebInfo> entry : nodeRebInfoMap.entrySet()) {
if (itemCnt++ > 0) {
- strBuffer.append(",");
+ sBuilder.append(",");
}
- strBuffer.append("\"").append(entry.getKey()).append("\":");
- strBuffer = entry.getValue().toJsonString(strBuffer);
+ sBuilder.append("\"").append(entry.getKey()).append("\":");
+ sBuilder = entry.getValue().toJsonString(sBuilder);
}
- strBuffer.append("},\"isBandConsume\":").append(isBandConsume);
+ sBuilder.append("},\"isBandConsume\":").append(isBandConsume);
// Append band consume info
if (isBandConsume) {
- strBuffer.append(",\"isNotAllocate\":").append(isNotAllocate)
+ sBuilder.append(",\"isNotAllocate\":").append(isNotAllocate)
.append(",\"sessionKey\":\"").append(sessionKey)
.append("\",\"isSelectBig\":").append(isSelectBig)
.append(",\"reqSourceCount\":").append(reqSourceCount)
.append(",\"curSourceCount\":").append(curSourceCount)
.append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
}
- strBuffer.append(",\"rebInfo\":{");
+ sBuilder.append(",\"rebInfo\":{");
if (rebalanceStatus == -2) {
- strBuffer.append("\"isRebalanced\":false");
+ sBuilder.append("\"isRebalanced\":false");
} else if (rebalanceStatus == 0) {
- strBuffer.append("\"isRebalanced\":true,\"checkPasted\":false")
+ sBuilder.append("\"isRebalanced\":true,\"checkPasted\":false")
.append(",\"defBClientRate\":").append(defBClientRate)
.append(",\"confBClientRate\":").append(confBClientRate)
.append(",\"curBClientRate\":").append(curBClientRate)
.append(",\"minRequireClientCnt\":").append(minRequireClientCnt);
} else {
- strBuffer.append("\"isRebalanced\":true,\"checkPasted\":true")
+ sBuilder.append("\"isRebalanced\":true,\"checkPasted\":true")
.append(",\"defBClientRate\":").append(defBClientRate)
.append(",\"confBClientRate\":").append(confBClientRate)
.append(",\"curBClientRate\":").append(curBClientRate);
}
- strBuffer.append("},\"filterConds\":{");
+ sBuilder.append("},\"filterConds\":{");
if (existedTopicConditions != null) {
int keyCount = 0;
for (Map.Entry<String, TreeSet<String>> entry : existedTopicConditions.entrySet()) {
if (keyCount++ > 0) {
- strBuffer.append(",");
+ sBuilder.append(",");
}
- strBuffer.append("\"").append(entry.getKey()).append("\":[");
+ sBuilder.append("\"").append(entry.getKey()).append("\":[");
if (entry.getValue() != null) {
int itemCount = 0;
for (String filterCond : entry.getValue()) {
if (itemCount++ > 0) {
- strBuffer.append(",");
+ sBuilder.append(",");
}
- strBuffer.append("\"").append(filterCond).append("\"");
+ sBuilder.append("\"").append(filterCond).append("\"");
}
}
- strBuffer.append("]");
+ sBuilder.append("]");
}
}
- strBuffer.append("}");
+ sBuilder.append("}");
// Append consumer info of the group
- getConsumerInfoList(consumerList, isBandConsume, strBuffer);
- strBuffer.append("}");
+ getConsumerInfoList(consumerList, isBandConsume, sBuilder);
+ sBuilder.append("}");
} catch (Exception e) {
- strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append(e.getMessage()).append("\",\"count\":0,\"data\":[]}");
}
- return strBuffer;
+ return sBuilder;
}
/**
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
index 76e9cae..ba988aa 100644
--- a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
+++ b/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
@@ -31,7 +31,7 @@ public class TopicPSInfoManagerTest {
@Before
public void setUp() throws Exception {
- topicPSInfoManager = new TopicPSInfoManager();
+ topicPSInfoManager = new TopicPSInfoManager(null);
}
@After