You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/11/14 04:19:15 UTC
[incubator-inlong] branch master updated: [INLONG-1789]Adjust the logic of the consumer group on the Master side (#1790)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 514b53e [INLONG-1789]Adjust the logic of the consumer group on the Master side (#1790)
514b53e is described below
commit 514b53eccb9d278289f6225f73c8a8ac5099d500
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Nov 14 12:19:09 2021 +0800
[INLONG-1789]Adjust the logic of the consumer group on the Master side (#1790)
---
.../inlong/tubemq/corebase/TErrCodeConstants.java | 14 +
.../tubemq/corebase/cluster/ConsumerInfo.java | 164 -----
.../server/common/paramcheck/PBParameterUtils.java | 178 +----
.../tubemq/server/common/utils/ClientSyncInfo.java | 113 ++++
.../inlong/tubemq/server/master/TMaster.java | 143 ++--
.../server/master/balance/DefaultLoadBalancer.java | 247 +++----
.../tubemq/server/master/balance/LoadBalancer.java | 5 +-
.../server/master/metamanage/MetaDataManager.java | 2 +-
.../nodemanage/nodebroker/TopicPSInfoManager.java | 77 +--
.../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 725 +++++++++++++++++++++
.../nodemanage/nodeconsumer/ConsumeType.java | 55 ++
.../nodemanage/nodeconsumer/ConsumerBandInfo.java | 495 --------------
.../nodeconsumer/ConsumerEventManager.java | 20 +-
.../nodemanage/nodeconsumer/ConsumerInfo.java | 244 +++++++
.../nodeconsumer/ConsumerInfoHolder.java | 595 ++++++++---------
.../nodemanage/nodeconsumer/TopicConfigInfo.java | 65 ++
.../server/master/web/action/screen/Master.java | 104 +--
.../web/handler/WebAdminGroupCtrlHandler.java | 10 +-
.../master/web/handler/WebOtherInfoHandler.java | 81 ++-
.../nodebroker/TopicPSInfoManagerTest.java | 20 +-
20 files changed, 1808 insertions(+), 1549 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
index 3210122..30ff827 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TErrCodeConstants.java
@@ -27,6 +27,7 @@ public class TErrCodeConstants {
public static final int BAD_REQUEST = 400;
public static final int UNAUTHORIZED = 401;
+ public static final int CONNECT_RETURN_NULL = 402;
public static final int FORBIDDEN = 403;
public static final int NOT_FOUND = 404;
public static final int ALL_PARTITION_FROZEN = 405;
@@ -38,6 +39,18 @@ public class TErrCodeConstants {
public static final int DUPLICATE_PARTITION = 412;
public static final int CERTIFICATE_FAILURE = 415;
public static final int SERVER_RECEIVE_OVERFLOW = 419;
+ public static final int CLIENT_SHUTDOWN = 420;
+ public static final int CLIENT_HIGH_FREQUENCY_REQUEST = 421;
+ public static final int PARTITION_UNSUBSCRIBABLE = 422;
+ public static final int PARTITION_UNPUBLISHABLE = 423;
+ public static final int CLIENT_INCONSISTENT_CONSUMETYPE = 424;
+ public static final int CLIENT_INCONSISTENT_TOPICSET = 425;
+ public static final int CLIENT_INCONSISTENT_FILTERSET = 426;
+ public static final int CLIENT_INCONSISTENT_SESSIONKEY = 427;
+ public static final int CLIENT_INCONSISTENT_SELECTBIG = 428;
+ public static final int CLIENT_INCONSISTENT_SOURCECOUNT = 429;
+ public static final int CLIENT_DUPLICATE_INDEXID = 430;
+
public static final int CONSUME_GROUP_FORBIDDEN = 450;
public static final int SERVER_CONSUME_SPEED_LIMIT = 452;
public static final int CONSUME_CONTENT_FORBIDDEN = 455;
@@ -45,6 +58,7 @@ public class TErrCodeConstants {
public static final int INTERNAL_SERVER_ERROR = 500;
public static final int SERVICE_UNAVAILABLE = 503;
public static final int INTERNAL_SERVER_ERROR_MSGSET_NULL = 510;
+ public static final int UNSPECIFIED_ABNORMAL = 599;
public static final List<Integer> IGNORE_ERROR_SET =
Arrays.asList(BAD_REQUEST, NOT_FOUND, ALL_PARTITION_FROZEN,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/ConsumerInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/ConsumerInfo.java
deleted file mode 100644
index 9a1e32c..0000000
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/cluster/ConsumerInfo.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.corebase.cluster;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-
-public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
-
- private static final long serialVersionUID = 3095734962491009711L;
-
- private final String consumerId;
- private final String group;
- private final Set<String> topicSet;
- private final Map<String, TreeSet<String>> topicConditions;
- private boolean requireBound = false;
- private String sessionKey = "";
- private long startTime = TBaseConstants.META_VALUE_UNDEFINED;
- private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
- private boolean overTLS = false;
- private Map<String, Long> requiredPartition;
-
- public ConsumerInfo(String consumerId,
- boolean overTLS,
- String group,
- Set<String> topicSet,
- Map<String, TreeSet<String>> topicConditions,
- boolean requireBound,
- String sessionKey,
- long startTime,
- int sourceCount,
- Map<String, Long> requiredPartition) {
- this.consumerId = consumerId;
- this.overTLS = overTLS;
- this.group = group;
- this.topicSet = topicSet;
- if (topicConditions == null) {
- this.topicConditions =
- new HashMap<>();
- } else {
- this.topicConditions = topicConditions;
- }
- this.requireBound = requireBound;
- this.sessionKey = sessionKey;
- this.startTime = startTime;
- this.sourceCount = sourceCount;
- this.requiredPartition = requiredPartition;
- }
-
- @Override
- public String toString() {
- StringBuilder sBuilder = new StringBuilder(512);
- sBuilder.append(consumerId);
- sBuilder.append("@");
- sBuilder.append(group);
- sBuilder.append(":");
- int count = 0;
- for (String topicItem : topicSet) {
- if (count++ > 0) {
- sBuilder.append(",");
- }
- sBuilder.append(topicItem);
- }
- sBuilder.append("@overTLS=").append(overTLS);
- return sBuilder.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof ConsumerInfo)) {
- return false;
- }
- final ConsumerInfo info = (ConsumerInfo) obj;
- return (this.consumerId.equals(info.getConsumerId()));
- }
-
- @Override
- public int compareTo(ConsumerInfo o) {
- if (!this.consumerId.equals(o.consumerId)) {
- return this.consumerId.compareTo(o.consumerId);
- }
- if (!this.group.equals(o.group)) {
- return this.group.compareTo(o.group);
- }
- return 0;
- }
-
- public boolean isRequireBound() {
- return this.requireBound;
- }
-
- public String getSessionKey() {
- return this.sessionKey;
- }
-
- public void setSessionKey(String sessionKey) {
- this.sessionKey = sessionKey;
- }
-
- public long getStartTime() {
- return this.startTime;
- }
-
- public int getSourceCount() {
- return this.sourceCount;
- }
-
- public Map<String, Long> getRequiredPartition() {
- return this.requiredPartition;
- }
-
- public String getConsumerId() {
- return consumerId;
- }
-
- public String getGroup() {
- return group;
- }
-
- public Set<String> getTopicSet() {
- return topicSet;
- }
-
- public Map<String, TreeSet<String>> getTopicConditions() {
- return topicConditions;
- }
-
- public boolean isOverTLS() {
- return overTLS;
- }
-
- @Override
- public ConsumerInfo clone() {
- return new ConsumerInfo(this.consumerId, this.overTLS, this.group, this.topicSet,
- this.topicConditions, this.requireBound, this.sessionKey,
- this.startTime, this.sourceCount, this.requiredPartition);
- }
-
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
index 0ae10a5..83ba6d7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -22,11 +22,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corebase.TokenConstants;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.broker.metadata.MetadataManager;
@@ -36,7 +34,8 @@ import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeType;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,13 +121,13 @@ public class PBParameterUtils {
return retResult;
}
- public static ParamCheckResult checkConsumerOffsetSetInfo(boolean isReqConsumeBand,
+ public static ParamCheckResult checkConsumerOffsetSetInfo(ConsumeType csmType,
final Set<String> reqTopicSet,
final String requiredParts,
final StringBuilder strBuffer) {
Map<String, Long> requiredPartMap = new HashMap<>();
ParamCheckResult retResult = new ParamCheckResult();
- if (!isReqConsumeBand) {
+ if (csmType != ConsumeType.CONSUME_BAND) {
retResult.setCheckData(requiredPartMap);
return retResult;
}
@@ -200,13 +199,15 @@ public class PBParameterUtils {
return retResult;
}
GroupResCtrlEntity offsetResetGroupEntity =
- defMetaDataManager.confGetGroupResCtrlConf(inConsumerInfo.getGroup());
+ defMetaDataManager.confGetGroupResCtrlConf(inConsumerInfo.getGroupName());
if (masterConfig.isStartOffsetResetCheck()) {
if (offsetResetGroupEntity == null) {
retResult.setCheckResult(false,
TErrCodeConstants.BAD_REQUEST,
- "[unauthorized subscribe] ConsumeGroup must be authorized by administrator before"
- + " using bound subscribe, please contact to administrator!");
+ strBuffer.append("[unauthorized subscribe] ConsumeGroup must be ")
+ .append("authorized by administrator before using bound subscribe")
+ .append(", please contact to administrator!").toString());
+ strBuffer.delete(0, strBuffer.length());
return retResult;
}
}
@@ -232,167 +233,6 @@ public class PBParameterUtils {
return retResult;
}
- // #lizard forgives
- public static ParamCheckResult validConsumerExistInfo(ConsumerInfo inConsumerInfo,
- boolean isSelectBig,
- ConsumerBandInfo consumerBandInfo,
- final StringBuilder strBuffer) throws Exception {
- // This part is mainly to check whether the newly accessed client is consistent with the existing
- // consumer consumption target
- ParamCheckResult retResult = new ParamCheckResult();
- if (consumerBandInfo == null) {
- retResult.setCheckData(inConsumerInfo);
- return retResult;
- }
- // check whether the consumer behavior is consistent
- if (inConsumerInfo.isRequireBound() != consumerBandInfo.isBandConsume()) {
- if (inConsumerInfo.isRequireBound()) {
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append(" using bound subscribe is inconsistency ")
- .append("with other consumers using unbound subscribe in the group");
- } else {
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append(" using unbound subscribe is inconsistency with other consumers")
- .append(" using bound subscribe in the group");
- }
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.toString());
- logger.warn(strBuffer.toString());
- return retResult;
- }
- // check the topics of consumption
- List<ConsumerInfo> infoList = consumerBandInfo.getConsumerInfoList();
- Set<String> existedTopics = consumerBandInfo.getTopicSet();
- Map<String, TreeSet<String>> existedTopicConditions = consumerBandInfo.getTopicConditions();
- if (existedTopics != null && !existedTopics.isEmpty()) {
- if (existedTopics.size() != inConsumerInfo.getTopicSet().size()
- || !existedTopics.containsAll(inConsumerInfo.getTopicSet())) {
-
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append(" subscribed topics ").append(inConsumerInfo.getTopicSet())
- .append(" is inconsistency with other consumers in the group, existedTopics: ")
- .append(existedTopics).toString());
- logger.warn(strBuffer.toString());
- return retResult;
- }
- }
- if (infoList != null && !infoList.isEmpty()) {
- boolean isCondEqual = true;
- if (existedTopicConditions == null || existedTopicConditions.isEmpty()) {
- if (inConsumerInfo.getTopicConditions().isEmpty()) {
- isCondEqual = true;
- } else {
- isCondEqual = false;
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append(" subscribe with filter condition ")
- .append(inConsumerInfo.getTopicConditions())
- .append(" is inconsistency with other consumers in the group: topic without conditions");
- }
- } else {
- // check the filter conditions of the topic
- if (inConsumerInfo.getTopicConditions().isEmpty()) {
- isCondEqual = false;
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append(" subscribe without filter condition ")
- .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
- .append(existedTopicConditions);
- } else {
- Set<String> existedCondTopics = existedTopicConditions.keySet();
- Set<String> reqCondTopics = inConsumerInfo.getTopicConditions().keySet();
- if (existedCondTopics.size() != reqCondTopics.size()
- || !existedCondTopics.containsAll(reqCondTopics)) {
- isCondEqual = false;
- strBuffer.append("[Inconsistency subscribe] ")
- .append(inConsumerInfo.getConsumerId())
- .append(" subscribe with filter condition ")
- .append(inConsumerInfo.getTopicConditions())
- .append(" is inconsistency with other consumers in the group, ")
- .append("existed topic conditions is ")
- .append(existedTopicConditions);
- } else {
- isCondEqual = true;
- for (String topicKey : existedCondTopics) {
- if ((existedTopicConditions.get(topicKey).size()
- != inConsumerInfo.getTopicConditions().get(topicKey).size())
- || (!existedTopicConditions.get(topicKey).containsAll(inConsumerInfo
- .getTopicConditions().get(topicKey)))) {
- isCondEqual = false;
- strBuffer.append("[Inconsistency subscribe] ")
- .append(inConsumerInfo.getConsumerId())
- .append(" subscribe with filter condition ")
- .append(inConsumerInfo.getTopicConditions())
- .append(" is inconsistency with other consumers ")
- .append("in the group, existed topic conditions is ")
- .append(existedTopicConditions);
- break;
- }
- }
- }
- }
- }
- if (!isCondEqual) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.toString());
- logger.warn(strBuffer.toString());
- return retResult;
- }
- }
- if (inConsumerInfo.isRequireBound()) {
- // If the sessionKey is inconsistent, it means that the previous round of consumption has not completely
- // exited. In order to avoid the incomplete offset setting, it is necessary to completely clear the above
- // data before resetting and consuming this round of consumption
- if (!inConsumerInfo.getSessionKey().equals(consumerBandInfo.getSessionKey())) {
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append("'s sessionKey is inconsistency with other consumers in the group, required is ")
- .append(consumerBandInfo.getSessionKey()).append(", request is ")
- .append(inConsumerInfo.getSessionKey());
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.toString());
- logger.warn(strBuffer.toString());
- return retResult;
- }
- // check the offset config
- if (isSelectBig != consumerBandInfo.isSelectedBig()) {
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append("'s isSelectBig is inconsistency with other consumers in the group, required is ")
- .append(consumerBandInfo.isSelectedBig())
- .append(", request is ").append(isSelectBig);
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.toString());
- logger.warn(strBuffer.toString());
- return retResult;
- }
- // check the consumers count
- if (inConsumerInfo.getSourceCount() != consumerBandInfo.getSourceCount()) {
- strBuffer.append("[Inconsistency subscribe] ").append(inConsumerInfo.getConsumerId())
- .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
- .append(consumerBandInfo.getSourceCount())
- .append(", request is ").append(inConsumerInfo.getSourceCount());
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.toString());
- logger.warn(strBuffer.toString());
- return retResult;
- }
- }
- boolean registered = false;
- if (infoList != null) {
- for (ConsumerInfo info : infoList) {
- if (info.getConsumerId().equals(inConsumerInfo.getConsumerId())) {
- registered = true;
- }
- }
- }
- retResult.setCheckData(registered);
- return retResult;
- }
-
/**
* Check the id of broker
*
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
new file mode 100644
index 0000000..72f0773
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.common.utils;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TokenConstants;
+import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+
+public class ClientSyncInfo {
+ private boolean updated = false;
+ private long brokerConfigId = TBaseConstants.META_VALUE_UNDEFINED;
+ private long topicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+ private long lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
+ private boolean repPartInfo = false;
+ private final Set<Partition> subPartSet = new HashSet<>();
+
+ public ClientSyncInfo() {
+
+ }
+
+ public void updSubRepInfo(BrokerRunManager brokerRunManager,
+ ClientMaster.ClientSubRepInfo clientSubRepInfo) {
+ if (clientSubRepInfo == null) {
+ return;
+ }
+ if (clientSubRepInfo.hasBrokerConfigId()) {
+ this.brokerConfigId = clientSubRepInfo.getBrokerConfigId();
+ this.updated = true;
+ }
+ if (clientSubRepInfo.hasTopicMetaInfoId()) {
+ this.topicMetaInfoId = clientSubRepInfo.getTopicMetaInfoId();
+ this.updated = true;
+ }
+ if (clientSubRepInfo.hasLstAssignedTime()) {
+ this.lstAssignedTime = clientSubRepInfo.getLstAssignedTime();
+ this.updated = true;
+ }
+ if (clientSubRepInfo.hasReportSubInfo()) {
+ this.repPartInfo = clientSubRepInfo.getReportSubInfo();
+ if (this.repPartInfo) {
+ for (String info : clientSubRepInfo.getPartSubInfoList()) {
+ if (TStringUtils.isBlank(info)) {
+ continue;
+ }
+ String[] strInfo = info.split(TokenConstants.SEGMENT_SEP);
+ String[] strPartInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
+ for (String partStr : strPartInfoSet) {
+ String[] strPartInfo = partStr.split(TokenConstants.ATTR_SEP);
+ BrokerInfo brokerInfo =
+ brokerRunManager.getBrokerInfo(Integer.parseInt(strPartInfo[0]));
+ if (brokerInfo == null) {
+ continue;
+ }
+ subPartSet.add(new Partition(brokerInfo,
+ strInfo[0], Integer.parseInt(strPartInfo[1])));
+ }
+ }
+ }
+ this.updated = true;
+ }
+ }
+
+ public boolean isUpdated() {
+ return updated;
+ }
+
+ public long getBrokerConfigId() {
+ return brokerConfigId;
+ }
+
+ public long getTopicMetaInfoId() {
+ return topicMetaInfoId;
+ }
+
+ public long getLstAssignedTime() {
+ return lstAssignedTime;
+ }
+
+ public Tuple2<Boolean, Set<Partition>> getRepSubInfo() {
+ return new Tuple2<>(this.repPartInfo, this.subPartSet);
+ }
+
+ public void clear() {
+ this.updated = false;
+ this.brokerConfigId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.topicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
+ this.repPartInfo = false;
+ this.subPartSet.clear();
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index c40a6f1..0bee4de 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -41,7 +41,6 @@ import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
import org.apache.inlong.tubemq.corebase.balance.EventStatus;
import org.apache.inlong.tubemq.corebase.balance.EventType;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.inlong.tubemq.corebase.cluster.NodeAddrInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
@@ -77,7 +76,6 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.Registe
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2CV2;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2P;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
-import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
@@ -112,8 +110,10 @@ import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHol
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeType;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerEventManager;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeproducer.ProducerInfoHolder;
import org.apache.inlong.tubemq.server.master.utils.Chore;
@@ -178,7 +178,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
false, TBaseConstants.META_VALUE_UNDEFINED);
this.producerHolder = new ProducerInfoHolder();
- this.consumerHolder = new ConsumerInfoHolder();
+ this.consumerHolder = new ConsumerInfoHolder(this.masterConfig);
this.consumerEventManager = new ConsumerEventManager(consumerHolder);
this.topicPSInfoManager = new TopicPSInfoManager(this);
this.loadBalancer = new DefaultLoadBalancer();
@@ -551,9 +551,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
- boolean isReqConsumeBand = (request.hasRequireBound() && request.getRequireBound());
+ ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
+ ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
- paramCheckResult = PBParameterUtils.checkConsumerOffsetSetInfo(isReqConsumeBand,
+ paramCheckResult = PBParameterUtils.checkConsumerOffsetSetInfo(csmType,
reqTopicSet, requiredParts, strBuffer);
if (!paramCheckResult.result) {
builder.setErrCode(paramCheckResult.errCode);
@@ -568,13 +569,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
? request.getSessionTime() : System.currentTimeMillis();
int sourceCount = request.hasTotalCount()
? request.getTotalCount() : -1;
- int reqQryPriorityId = request.hasQryPriorityId()
+ int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
+ List<SubscribeInfo> subscribeList =
+ DataConverterUtil.convertSubInfo(request.getSubscribeInfoList());
+ boolean isNotAllocated = true;
+ if (CollectionUtils.isNotEmpty(subscribeList)
+ || ((request.hasNotAllocated() && !request.getNotAllocated()))) {
+ isNotAllocated = false;
+ }
boolean isSelectBig = (!request.hasSelectBig() || request.getSelectBig());
+ // build consumer object
ConsumerInfo inConsumerInfo =
new ConsumerInfo(consumerId, overtls, groupName,
- reqTopicSet, reqTopicConditions, isReqConsumeBand,
- sessionKey, sessionTime, sourceCount, requiredPartMap);
+ reqTopicSet, reqTopicConditions, csmType,
+ sessionKey, sessionTime, sourceCount, isSelectBig, requiredPartMap);
paramCheckResult =
PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
masterConfig, defMetaDataManager, brokerRunManager, strBuffer);
@@ -606,38 +615,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
// need removed for authorize center end
Integer lid = null;
+ ConsumeGroupInfo consumeGroupInfo = null;
try {
lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(groupName);
- paramCheckResult =
- PBParameterUtils.validConsumerExistInfo(inConsumerInfo2, isSelectBig, consumerBandInfo, strBuffer);
- if (!paramCheckResult.result) {
+ if (!consumerHolder.addConsumer(inConsumerInfo2,
+ isNotAllocated, strBuffer, paramCheckResult)) {
builder.setErrCode(paramCheckResult.errCode);
builder.setErrMsg(paramCheckResult.errMsg);
return builder.build();
}
- boolean registered = (consumerBandInfo != null) && (boolean) paramCheckResult.checkData;
- List<SubscribeInfo> subscribeList =
- DataConverterUtil.convertSubInfo(request.getSubscribeInfoList());
- boolean isNotAllocated = true;
- if (CollectionUtils.isNotEmpty(subscribeList)
- || ((request.hasNotAllocated() && !request.getNotAllocated()))) {
- isNotAllocated = false;
- }
- if ((consumerBandInfo != null && consumerBandInfo.getConsumerInfoList() == null) || !registered) {
- consumerHolder.addConsumer(inConsumerInfo2, isNotAllocated, isSelectBig);
- }
- for (String topic : reqTopicSet) {
- ConcurrentHashSet<String> groupSet =
- topicPSInfoManager.getTopicSubInfo(topic);
- if (groupSet == null) {
- groupSet = new ConcurrentHashSet<>();
- topicPSInfoManager.setTopicSubInfo(topic, groupSet);
- }
- if (!groupSet.contains(groupName)) {
- groupSet.add(groupName);
- }
- }
+ consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData;
+ topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
if (CollectionUtils.isNotEmpty(subscribeList)) {
Map<String, Map<String, Partition>> topicPartSubMap =
new HashMap<>();
@@ -663,8 +651,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
logger.info(strBuffer.append("[Consumer Register] ")
- .append(consumerId).append(", isOverTLS=").append(overtls)
- .append(", clientJDKVer=").append(clientJdkVer).toString());
+ .append(consumerId).append(", isOverTLS=").append(overtls)
+ .append(", clientJDKVer=").append(clientJdkVer).toString());
strBuffer.delete(0, strBuffer.length());
if (request.hasDefFlowCheckId() || request.hasGroupFlowCheckId()) {
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
@@ -693,7 +681,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
- builder.setNotAllocated(consumerHolder.isNotAllocated(groupName));
+ builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
@@ -742,8 +730,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
final String groupName = (String) paramCheckResult.checkData;
checkNodeStatus(clientId, strBuffer);
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(groupName);
- if (consumerBandInfo == null) {
+ ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
+ if (consumeGroupInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
builder.setErrMsg(strBuffer.append("Not found groupName ")
.append(groupName).append(" in holder!").toString());
@@ -752,7 +740,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// authorize check
CertifiedResult authorizeResult =
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
- groupName, consumerBandInfo.getTopicSet(), consumerBandInfo.getTopicConditions(), rmtAddress);
+ groupName, consumeGroupInfo.getTopicSet(), consumeGroupInfo.getTopicConditions(), rmtAddress);
if (!authorizeResult.result) {
builder.setErrCode(authorizeResult.errCode);
builder.setErrMsg(authorizeResult.errInfo);
@@ -816,7 +804,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
logger.info(processedEvent.toStrBuilder(strBuffer).toString());
strBuffer.delete(0, strBuffer.length());
try {
- consumerHolder.setAllocated(groupName);
+ consumeGroupInfo.settAllocated();
consumerEventManager.removeFirst(clientId);
} catch (Throwable e) {
logger.warn("Unknown exception for remove first event:", e);
@@ -868,7 +856,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
- builder.setNotAllocated(consumerHolder.isNotAllocated(groupName));
+ builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
@@ -1289,8 +1277,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return;
}
final boolean isStartBalance = startupBalance;
- List<String> groupsNeedToBalance =
- isStartBalance ? consumerHolder.getAllGroup() : getNeedToBalanceGroupList(strBuffer);
+ List<String> groupsNeedToBalance = isStartBalance
+ ? consumerHolder.getAllServerBalanceGroups() : getNeedToBalanceGroups(strBuffer);
strBuffer.delete(0, strBuffer.length());
if (!groupsNeedToBalance.isEmpty()) {
// set parallel rebalance signal
@@ -1360,12 +1348,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// choose different load balance strategy
if (isFirstReb) {
finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
- brokerRunManager, groups, defMetaDataManager,
- masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
+ brokerRunManager, groups, defMetaDataManager, strBuffer);
} else {
finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
- consumerHolder, brokerRunManager, groups, defMetaDataManager,
- masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
+ consumerHolder, brokerRunManager, groups, defMetaDataManager, strBuffer);
}
// allocate partitions to consumers
for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
@@ -1376,15 +1362,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (consumerId == null) {
continue;
}
- Tuple2<String, ConsumerInfo> tupleInfo =
- consumerHolder.getConsumeTupleInfo(consumerId);
- if (tupleInfo == null
- || tupleInfo.getF0() == null
- || tupleInfo.getF1() == null) {
+ ConsumerInfo consumerInfo =
+ consumerHolder.getConsumerInfo(consumerId);
+ if (consumerInfo == null) {
continue;
}
Set<String> blackTopicSet =
- defMetaDataManager.getDisableConsumeTopicByGroupName(tupleInfo.getF0());
+ defMetaDataManager.getDisableTopicByGroupName(consumerInfo.getGroupName());
Map<String, List<Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1405,7 +1389,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
currentPartMap = new HashMap<>();
}
}
- if (tupleInfo.getF1().isOverTLS()) {
+ if (consumerInfo.isOverTLS()) {
for (Partition currentPart : currentPartMap.values()) {
if (!blackTopicSet.contains(currentPart.getTopic())) {
boolean found = false;
@@ -1421,8 +1405,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
- tupleInfo.getF1().isOverTLS(), currentPart));
+ .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(),
+ consumerInfo.isOverTLS(), currentPart));
}
for (Partition finalPart : finalPartList) {
if (!blackTopicSet.contains(finalPart.getTopic())) {
@@ -1438,7 +1422,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
continue;
}
addedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.getF0(), true, finalPart));
+ consumerInfo.getGroupName(), true, finalPart));
}
}
} else {
@@ -1446,14 +1430,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if ((blackTopicSet.contains(currentPart.getTopic()))
|| (!finalPartList.contains(currentPart))) {
deletedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.getF0(), false, currentPart));
+ consumerInfo.getGroupName(), false, currentPart));
}
}
for (Partition finalPart : finalPartList) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicSet.contains(finalPart.getTopic()))) {
addedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.getF0(), false, finalPart));
+ consumerInfo.getGroupName(), false, finalPart));
}
}
}
@@ -1516,16 +1500,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (consumerId == null) {
continue;
}
- Tuple2<String, ConsumerInfo> tupleInfo =
- consumerHolder.getConsumeTupleInfo(consumerId);
- if (tupleInfo == null
- || tupleInfo.getF0() == null
- || tupleInfo.getF1() == null) {
+ ConsumerInfo consumerInfo =
+ consumerHolder.getConsumerInfo(consumerId);
+ if (consumerInfo == null) {
continue;
}
// allocate partitions to consumers
Set<String> blackTopicSet =
- defMetaDataManager.getDisableConsumeTopicByGroupName(tupleInfo.getF0());
+ defMetaDataManager.getDisableTopicByGroupName(consumerInfo.getGroupName());
Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1552,15 +1534,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if ((blackTopicSet.contains(currentPart.getTopic()))
|| (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
- tupleInfo.getF1().isOverTLS(), currentPart));
+ .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(),
+ consumerInfo.isOverTLS(), currentPart));
}
}
for (Partition finalPart : finalPartMap.values()) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicSet.contains(finalPart.getTopic()))) {
- addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
- tupleInfo.getF1().isOverTLS(), finalPart));
+ addedSubInfoList.add(new SubscribeInfo(consumerId,
+ consumerInfo.getGroupName(), consumerInfo.isOverTLS(), finalPart));
}
}
}
@@ -1665,7 +1647,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @param strBuffer
* @return
*/
- private List<String> getNeedToBalanceGroupList(final StringBuilder strBuffer) {
+ private List<String> getNeedToBalanceGroups(final StringBuilder strBuffer) {
List<String> groupsNeedToBalance = new ArrayList<>();
Set<String> groupHasUnfinishedEvent = new HashSet<>();
if (consumerEventManager.hasEvent()) {
@@ -1677,7 +1659,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (consumerId == null) {
continue;
}
- String group = consumerHolder.getGroup(consumerId);
+ String group = consumerHolder.getGroupName(consumerId);
if (group == null) {
continue;
}
@@ -1698,7 +1680,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
consumerEventManager.updateUnfinishedCountMap(groupHasUnfinishedEvent);
- List<String> allGroups = consumerHolder.getAllGroup();
+ List<String> allGroups = consumerHolder.getAllServerBalanceGroups();
if (groupHasUnfinishedEvent.isEmpty()) {
for (String group : allGroups) {
if (group != null) {
@@ -1899,18 +1881,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
String group = nodeStrs[1];
Integer lid = null;
try {
- lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
+ lid = masterRowLock.getLock(null,
+ StringUtils.getBytesUtf8(consumerId), true);
ConsumerInfo info = consumerHolder.removeConsumer(group, consumerId);
currentSubInfo.remove(consumerId);
consumerEventManager.removeAll(consumerId);
- List<ConsumerInfo> consumerList =
- consumerHolder.getConsumerList(group);
- if (consumerList == null || consumerList.isEmpty()) {
- if (info != null) {
- for (String topic : info.getTopicSet()) {
- topicPSInfoManager.removeTopicSubInfo(topic, group);
- }
- }
+ if (info != null && consumerHolder.isConsumeGroupEmpty(group)) {
+ topicPSInfoManager.rmvGroupSubTopicInfo(group, info.getTopicSet());
}
} catch (IOException e) {
logger.warn("Failed to lock.", e);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
index c8bfdab..39e4830 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -30,16 +30,13 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.tubemq.corebase.TokenConstants;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
-import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorageInfo;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.RebProcessInfo;
@@ -63,7 +60,6 @@ public class DefaultLoadBalancer implements LoadBalancer {
* @param brokerRunManager
* @param groupSet
* @param metaDataManager
- * @param defAllowBClientRate
* @param strBuffer
* @return
*/
@@ -74,36 +70,31 @@ public class DefaultLoadBalancer implements LoadBalancer {
BrokerRunManager brokerRunManager,
List<String> groupSet,
MetaDataManager metaDataManager,
- int defAllowBClientRate,
StringBuilder strBuffer) {
// #lizard forgives
// load balance according to group
Map<String/* consumer */,
Map<String/* topic */, List<Partition>>> finalSubInfoMap =
new HashMap<>();
- Map<String, RebProcessInfo> rejGroupClientINfoMap = new HashMap<>();
+ Map<String, RebProcessInfo> rejGroupClientInfoMap = new HashMap<>();
Set<String> onlineOfflineGroupSet = new HashSet<>();
- Set<String> bandGroupSet = new HashSet<>();
+ Set<String> boundGroupSet = new HashSet<>();
for (String group : groupSet) {
if (group == null) {
continue;
}
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
- if (consumerBandInfo == null) {
+ ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(group);
+ if (consumeGroupInfo == null || consumeGroupInfo.isClientBalance()) {
continue;
}
- List<ConsumerInfo> consumerList = consumerBandInfo.getConsumerInfoList();
+ List<ConsumerInfo> consumerList =
+ consumeGroupInfo.getConsumerInfoList();
if (CollectionUtils.isEmpty(consumerList)) {
continue;
}
- // deal with regular consumer allocation, band consume allocation not in this part
- Map<String, String> partsConsumerMap =
- consumerBandInfo.getPartitionInfoMap();
- if (consumerBandInfo.isBandConsume()
- && consumerBandInfo.isNotAllocate()
- && !partsConsumerMap.isEmpty()
- && consumerBandInfo.getAllocatedTimes() < 2) {
- bandGroupSet.add(group);
+ // deal with regular consumer allocation, bound consume not in this part
+ if (consumeGroupInfo.isUnReadyServerBalance()) {
+ boundGroupSet.add(group);
continue;
}
List<ConsumerInfo> newConsumerList = new ArrayList<>();
@@ -115,9 +106,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
if (CollectionUtils.isEmpty(newConsumerList)) {
continue;
}
- Set<String> topicSet = consumerBandInfo.getTopicSet();
- if (!consumerBandInfo.isBandConsume()
- && consumerBandInfo.getRebalanceCheckStatus() <= 0) {
+ Set<String> topicSet = consumeGroupInfo.getTopicSet();
+ if (consumeGroupInfo.needResourceCheck()) {
// check if current client meet minimal requirements
GroupResCtrlEntity offsetResetGroupEntity =
metaDataManager.confGetGroupResCtrlConf(group);
@@ -125,7 +115,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
&& offsetResetGroupEntity.getAllowedBrokerClientRate() > 0)
? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
int allowRate = confAllowBClientRate > 0
- ? confAllowBClientRate : defAllowBClientRate;
+ ? confAllowBClientRate : consumerHolder.getDefResourceRate();
int maxBrokerCount =
brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
int curBClientRate = (int) Math.floor(maxBrokerCount / newConsumerList.size());
@@ -134,33 +124,33 @@ public class DefaultLoadBalancer implements LoadBalancer {
if (maxBrokerCount % allowRate != 0) {
minClientCnt += 1;
}
- consumerHolder.setCurConsumeBClientInfo(group, defAllowBClientRate,
- confAllowBClientRate, curBClientRate, minClientCnt, false);
- if (consumerBandInfo.isRebalanceCheckPrint()) {
+ consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
+ curBClientRate, minClientCnt, false);
+ if (consumeGroupInfo.isEnableBalanceChkPrint()) {
logger.info(strBuffer.append("[UnBound Alloc 2] Not allocate partition :group(")
.append(group).append(")'s consumer getCachedSize(")
- .append(consumerBandInfo.getGroupCnt())
+ .append(consumeGroupInfo.getGroupCnt())
.append(") low than min required client count:")
.append(minClientCnt).toString());
strBuffer.delete(0, strBuffer.length());
}
continue;
} else {
- consumerHolder.setCurConsumeBClientInfo(group,
- defAllowBClientRate, confAllowBClientRate, curBClientRate, -2, true);
+ consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
+ curBClientRate, -2, true);
}
}
RebProcessInfo rebProcessInfo = new RebProcessInfo();
- if (!consumerBandInfo.isRebalanceMapEmpty()) {
+ if (!consumeGroupInfo.isBalanceMapEmpty()) {
rebProcessInfo = consumerHolder.getNeedRebNodeList(group);
if (!rebProcessInfo.isProcessInfoEmpty()) {
- rejGroupClientINfoMap.put(group, rebProcessInfo);
+ rejGroupClientInfoMap.put(group, rebProcessInfo);
}
}
List<ConsumerInfo> newConsumerList2 = new ArrayList<>();
Map<String, Partition> partMap =
brokerRunManager.getSubBrokerAcceptSubParts(topicSet);
- Map<String, NodeRebInfo> rebProcessInfoMap = consumerBandInfo.getRebalanceMap();
+ Map<String, NodeRebInfo> rebProcessInfoMap = consumeGroupInfo.getBalanceMap();
for (ConsumerInfo consumer : newConsumerList) {
Map<String, List<Partition>> partitions = new HashMap<>();
finalSubInfoMap.put(consumer.getConsumerId(), partitions);
@@ -176,7 +166,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
newConsumerList2.add(consumer);
}
for (Entry<String, Map<String, Partition>> entry : relation.entrySet()) {
- partitions.put(entry.getKey(), new ArrayList<Partition>());
+ partitions.put(entry.getKey(), new ArrayList<>());
}
continue;
}
@@ -223,19 +213,18 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
}
}
- if (!bandGroupSet.isEmpty()) {
- for (String group : bandGroupSet) {
+ if (!boundGroupSet.isEmpty()) {
+ for (String group : boundGroupSet) {
groupsNeedToBalance.remove(group);
}
}
if (!groupsNeedToBalance.isEmpty()) {
- finalSubInfoMap =
- balance(finalSubInfoMap, consumerHolder, brokerRunManager,
- groupsNeedToBalance, clusterState, rejGroupClientINfoMap);
+ balance(finalSubInfoMap, consumerHolder, brokerRunManager,
+ groupsNeedToBalance, clusterState, rejGroupClientInfoMap);
}
- if (!rejGroupClientINfoMap.isEmpty()) {
+ if (!rejGroupClientInfoMap.isEmpty()) {
for (Entry<String, RebProcessInfo> entry :
- rejGroupClientINfoMap.entrySet()) {
+ rejGroupClientInfoMap.entrySet()) {
consumerHolder.setRebNodeProcessed(entry.getKey(),
entry.getValue().needProcessList);
}
@@ -244,7 +233,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
// #lizard forgives
- private Map<String, Map<String, List<Partition>>> balance(
+ private void balance(
Map<String, Map<String, List<Partition>>> clusterState,
ConsumerInfoHolder consumerHolder,
BrokerRunManager brokerRunManager,
@@ -253,24 +242,24 @@ public class DefaultLoadBalancer implements LoadBalancer {
Map<String, RebProcessInfo> rejGroupClientInfoMap) {
// according to group
for (String group : groupSet) {
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
- if (consumerBandInfo == null) {
+ ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(group);
+ if (consumeGroupInfo == null || consumeGroupInfo.isClientBalance()) {
continue;
}
// filter consumer which don't need to handle
List<ConsumerInfo> consumerList = new ArrayList<>();
- List<ConsumerInfo> consumerList1 = consumerBandInfo.getConsumerInfoList();
+ List<ConsumerInfo> consumerList1 = consumeGroupInfo.getConsumerInfoList();
RebProcessInfo rebProcessInfo = rejGroupClientInfoMap.get(group);
if (rebProcessInfo != null) {
for (ConsumerInfo consumerInfo : consumerList1) {
+ if (consumerInfo == null) {
+ continue;
+ }
if (rebProcessInfo.needProcessList.contains(consumerInfo.getConsumerId())
|| rebProcessInfo.needEscapeList.contains(consumerInfo.getConsumerId())) {
Map<String, List<Partition>> partitions2 =
- clusterState.get(consumerInfo.getConsumerId());
- if (partitions2 == null) {
- partitions2 = new HashMap<>();
- clusterState.put(consumerInfo.getConsumerId(), partitions2);
- }
+ clusterState.computeIfAbsent(
+ consumerInfo.getConsumerId(), k -> new HashMap<>());
Map<String, Map<String, Partition>> relation =
oldClusterState.get(consumerInfo.getConsumerId());
if (relation != null) {
@@ -289,7 +278,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
continue;
}
// sort consumer and partitions, then mod
- Set<String> topics = consumerBandInfo.getTopicSet();
+ Set<String> topics = consumeGroupInfo.getTopicSet();
Map<String, Partition> psPartMap =
brokerRunManager.getSubBrokerAcceptSubParts(topics);
int min = psPartMap.size() / consumerList.size();
@@ -359,24 +348,16 @@ public class DefaultLoadBalancer implements LoadBalancer {
assign(partitionToMove.poll(), clusterState, consumerId);
}
}
-
}
- return clusterState;
}
private void assign(Partition partition,
Map<String, Map<String, List<Partition>>> clusterState,
String consumerId) {
- Map<String, List<Partition>> partitions = clusterState.get(consumerId);
- if (partitions == null) {
- partitions = new HashMap<>();
- clusterState.put(consumerId, partitions);
- }
- List<Partition> ps = partitions.get(partition.getTopic());
- if (ps == null) {
- ps = new ArrayList<>();
- partitions.put(partition.getTopic(), ps);
- }
+ Map<String, List<Partition>> partitions =
+ clusterState.computeIfAbsent(consumerId, k -> new HashMap<>());
+ List<Partition> ps = partitions.computeIfAbsent(
+ partition.getTopic(), k -> new ArrayList<>());
ps.add(partition);
}
@@ -424,16 +405,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
if (searched == 0) {
return;
}
- Map<String, List<Partition>> partitions = clusterState.get(consumer.getConsumerId());
- if (partitions == null) {
- partitions = new HashMap<>();
- clusterState.put(consumer.getConsumerId(), partitions);
- }
- List<Partition> ps = partitions.get(partition.getTopic());
- if (ps == null) {
- ps = new ArrayList<>();
- partitions.put(partition.getTopic(), ps);
- }
+ Map<String, List<Partition>> partitions =
+ clusterState.computeIfAbsent(consumer.getConsumerId(), k -> new HashMap<>());
+ List<Partition> ps = partitions.computeIfAbsent(
+ partition.getTopic(), k -> new ArrayList<>());
ps.add(partition);
}
}
@@ -489,7 +464,6 @@ public class DefaultLoadBalancer implements LoadBalancer {
* @param brokerRunManager
* @param groupSet
* @param metaDataManager
- * @param defAllowBClientRate
* @param strBuffer
* @return
*/
@@ -499,40 +473,32 @@ public class DefaultLoadBalancer implements LoadBalancer {
BrokerRunManager brokerRunManager,
List<String> groupSet,
MetaDataManager metaDataManager,
- int defAllowBClientRate,
StringBuilder strBuffer) {
// #lizard forgives
// regular consumer allocate operation
Map<String, Map<String, List<Partition>>> finalSubInfoMap =
new HashMap<>();
for (String group : groupSet) {
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
- // filter empty consumer
- if (consumerBandInfo == null) {
+ ConsumeGroupInfo consumeGroupInfo =
+ consumerHolder.getConsumeGroupInfo(group);
+ if (consumeGroupInfo == null
+ || consumeGroupInfo.isClientBalance()
+ || consumeGroupInfo.isUnReadyServerBalance()) {
continue;
}
-
- List<ConsumerInfo> consumerList = consumerBandInfo.getConsumerInfoList();
+ List<ConsumerInfo> consumerList = consumeGroupInfo.getConsumerInfoList();
if (CollectionUtils.isEmpty(consumerList)) {
continue;
}
- Map<String, String> partsConsumerMap =
- consumerBandInfo.getPartitionInfoMap();
- if (consumerBandInfo.isBandConsume()
- && consumerBandInfo.isNotAllocate()
- && !partsConsumerMap.isEmpty()
- && consumerBandInfo.getAllocatedTimes() < 2) {
- continue;
- }
// check if current client meet minimal requirements
- Set<String> topicSet = consumerBandInfo.getTopicSet();
+ Set<String> topicSet = consumeGroupInfo.getTopicSet();
GroupResCtrlEntity offsetResetGroupEntity =
metaDataManager.confGetGroupResCtrlConf(group);
int confAllowBClientRate = (offsetResetGroupEntity != null
&& offsetResetGroupEntity.getAllowedBrokerClientRate() > 0)
? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
int allowRate = confAllowBClientRate > 0
- ? confAllowBClientRate : defAllowBClientRate;
+ ? confAllowBClientRate : consumerHolder.getDefResourceRate();
int maxBrokerCount =
brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
int curBClientRate = (int) Math.floor(maxBrokerCount / consumerList.size());
@@ -541,21 +507,20 @@ public class DefaultLoadBalancer implements LoadBalancer {
if (maxBrokerCount % allowRate != 0) {
minClientCnt += 1;
}
- consumerHolder.setCurConsumeBClientInfo(group,
- defAllowBClientRate, confAllowBClientRate,
+ consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
curBClientRate, minClientCnt, false);
- if (consumerBandInfo.isRebalanceCheckPrint()) {
+ if (consumeGroupInfo.isEnableBalanceChkPrint()) {
logger.info(strBuffer.append("[UnBound Alloc 1] Not allocate partition :group(")
.append(group).append(")'s consumer getCachedSize(")
- .append(consumerBandInfo.getGroupCnt())
+ .append(consumeGroupInfo.getGroupCnt())
.append(") low than min required client count:")
.append(minClientCnt).toString());
strBuffer.delete(0, strBuffer.length());
}
continue;
} else {
- consumerHolder.setCurConsumeBClientInfo(group,
- defAllowBClientRate, confAllowBClientRate, curBClientRate, -2, true);
+ consumeGroupInfo.setConsumeResourceInfo(confAllowBClientRate,
+ curBClientRate, -2, true);
}
// sort and mod
Collections.sort(consumerList);
@@ -568,16 +533,9 @@ public class DefaultLoadBalancer implements LoadBalancer {
for (int i = 0; i < consumerList.size(); i++) {
String consumerId = consumerList.get(i).getConsumerId();
Map<String, List<Partition>> topicSubPartMap =
- finalSubInfoMap.get(consumerId);
- if (topicSubPartMap == null) {
- topicSubPartMap = new HashMap<>();
- finalSubInfoMap.put(consumerId, topicSubPartMap);
- }
- List<Partition> partList = topicSubPartMap.get(topic);
- if (partList == null) {
- partList = new ArrayList<>();
- topicSubPartMap.put(topic, partList);
- }
+ finalSubInfoMap.computeIfAbsent(consumerId, k -> new HashMap<>());
+ List<Partition> partList =
+ topicSubPartMap.computeIfAbsent(topic, k -> new ArrayList<>());
int startIndex = partsPerConsumer * i + Math.min(i, consumersWithExtraPart);
int parts = partsPerConsumer + ((i + 1) > consumersWithExtraPart ? 0 : 1);
for (int j = startIndex; j < startIndex + parts; j++) {
@@ -650,32 +608,23 @@ public class DefaultLoadBalancer implements LoadBalancer {
continue;
}
// filter band consumer
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(group);
- if (consumerBandInfo == null) {
+ ConsumeGroupInfo consumeGroupInfo =
+ consumerHolder.getConsumeGroupInfo(group);
+ if (consumeGroupInfo == null
+ || consumeGroupInfo.isGroupEmpty()
+ || consumeGroupInfo.isNotNeedBoundBalance()) {
continue;
}
- List<ConsumerInfo> consumerList = consumerBandInfo.getConsumerInfoList();
- if (CollectionUtils.isEmpty(consumerList)) {
- continue;
- }
- Map<String, String> partsConsumerMap =
- consumerBandInfo.getPartitionInfoMap();
- if (!consumerBandInfo.isBandConsume()
- || !consumerBandInfo.isNotAllocate()
- || partsConsumerMap.isEmpty()
- || consumerBandInfo.getAllocatedTimes() >= 2) {
- continue;
- }
- if (!consumerBandInfo.isGroupFullSize()) {
+ if (!consumeGroupInfo.isGroupFullSize()) {
// check if client size meet minimal requirements
Long checkCycle = consumerHolder.addCurCheckCycle(group);
if (isResetRebalance) {
if (checkCycle != null && checkCycle % 15 == 0) {
logger.info(strBuffer.append("[Bound Alloc 2] Not allocate partition :group(")
.append(group).append(")'s consumer getCachedSize(")
- .append(consumerBandInfo.getGroupCnt())
+ .append(consumeGroupInfo.getGroupCnt())
.append(") low than required source count:")
- .append(consumerBandInfo.getSourceCount())
+ .append(consumeGroupInfo.getSourceCount())
.append(", checked cycle is ")
.append(checkCycle).toString());
strBuffer.delete(0, strBuffer.length());
@@ -683,57 +632,35 @@ public class DefaultLoadBalancer implements LoadBalancer {
} else {
logger.info(strBuffer.append("[Bound Alloc 1] Not allocate partition :group(")
.append(group).append(")'s consumer getCachedSize(")
- .append(consumerBandInfo.getGroupCnt())
+ .append(consumeGroupInfo.getGroupCnt())
.append(") low than required source count:")
- .append(consumerBandInfo.getSourceCount()).toString());
+ .append(consumeGroupInfo.getSourceCount()).toString());
strBuffer.delete(0, strBuffer.length());
}
continue;
}
- // actual reset offset
- Map<String, Long> partsOffsetMap = consumerBandInfo.getPartOffsetMap();
- List<OffsetStorageInfo> offsetInfoList = new ArrayList<>();
- Map<String, Partition> partitionMap =
- brokerRunManager.getSubBrokerAcceptSubParts(consumerBandInfo.getTopicSet());
+ Map<String, Partition> partPubMap =
+ brokerRunManager.getSubBrokerAcceptSubParts(consumeGroupInfo.getTopicSet());
+ Map<String, Partition> partitionMap = new HashMap<>();
+ for (Partition partition : partPubMap.values()) {
+ partitionMap.put(partition.getPartitionKey(), partition);
+ }
+ Map<String, String> partsConsumerMap =
+ consumeGroupInfo.getPartitionInfoMap();
for (Entry<String, String> entry : partsConsumerMap.entrySet()) {
Partition foundPart = partitionMap.get(entry.getKey());
if (foundPart != null) {
- if (partsOffsetMap.get(entry.getKey()) != null) {
- offsetInfoList.add(new OffsetStorageInfo(foundPart.getTopic(),
- foundPart.getBroker().getBrokerId(),
- foundPart.getPartitionId(),
- partsOffsetMap.get(entry.getKey()), 0));
- }
String consumerId = entry.getValue();
Map<String, Map<String, Partition>> topicSubPartMap =
- finalSubInfoMap.get(consumerId);
- if (topicSubPartMap == null) {
- topicSubPartMap = new HashMap<>();
- finalSubInfoMap.put(consumerId, topicSubPartMap);
- }
- Map<String, Partition> partMap = topicSubPartMap.get(foundPart.getTopic());
- if (partMap == null) {
- partMap = new HashMap<>();
- topicSubPartMap.put(foundPart.getTopic(), partMap);
- }
+ finalSubInfoMap.computeIfAbsent(consumerId, k -> new HashMap<>());
+ Map<String, Partition> partMap =
+ topicSubPartMap.computeIfAbsent(
+ foundPart.getTopic(), k -> new HashMap<>());
partMap.put(foundPart.getPartitionKey(), foundPart);
partitionMap.remove(entry.getKey());
- } else {
- String[] partitionKeyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
- BrokerConfEntity brokerConfEntity =
- metaDataManager.getBrokerConfByBrokerId(Integer.parseInt(partitionKeyItems[0]));
- if (brokerConfEntity != null) {
- if (partsOffsetMap.get(entry.getKey()) != null) {
- offsetInfoList.add(new OffsetStorageInfo(partitionKeyItems[1],
- brokerConfEntity.getBrokerId(),
- Integer.parseInt(partitionKeyItems[2]),
- partsOffsetMap.get(entry.getKey()), 0));
- }
- }
}
}
- zkOffsetStorage.commitOffset(group, offsetInfoList, false);
- consumerHolder.addAllocatedTimes(group);
+ consumeGroupInfo.addAllocatedTimes();
}
return finalSubInfoMap;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java
index e9f3983..b34faf9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/balance/LoadBalancer.java
@@ -19,11 +19,12 @@ package org.apache.inlong.tubemq.server.master.balance;
import java.util.List;
import java.util.Map;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
+
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
public interface LoadBalancer {
@@ -34,7 +35,6 @@ public interface LoadBalancer {
BrokerRunManager brokerRunManager,
List<String> groups,
MetaDataManager metaDataManager,
- int defAllowBClientRate,
StringBuilder sBuilder);
Map<String, Map<String, Map<String, Partition>>> resetBalanceCluster(
@@ -50,7 +50,6 @@ public interface LoadBalancer {
BrokerRunManager brokerRunManager,
List<String> groups,
MetaDataManager metaDataManager,
- int defAllowBClientRate,
StringBuilder sBuilder);
Map<String, Map<String, Map<String, Partition>>> resetBukAssign(ConsumerInfoHolder consumerHolder,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index b68cd99..9ae2a42 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -1968,7 +1968,7 @@ public class MetaDataManager implements Server {
return metaStoreService.getConsumeCtrlByTopicName(topicName);
}
- public Set<String> getDisableConsumeTopicByGroupName(String groupName) {
+ public Set<String> getDisableTopicByGroupName(String groupName) {
Set<String> disTopicSet = new HashSet<>();
List<GroupConsumeCtrlEntity> qryResult =
metaStoreService.getConsumeCtrlByGroupName(groupName);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 56137a1..f646be9 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -48,35 +48,52 @@ public class TopicPSInfoManager {
* @param topic query topic
* @return query result
*/
- public ConcurrentHashSet<String> getTopicSubInfo(String topic) {
+ public Set<String> getTopicSubInfo(String topic) {
return topicSubInfoMap.get(topic);
}
/**
- * Set groups for a topic
+ * Add group subscribe topic info
*
- * @param topic topic target
- * @param groupSet group subscribed
+ * @param groupName the group name
+ * @param topicSet the topic set which the group subscribed
*/
- public void setTopicSubInfo(String topic,
- ConcurrentHashSet<String> groupSet) {
- topicSubInfoMap.put(topic, groupSet);
+ public void addGroupSubTopicInfo(String groupName, Set<String> topicSet) {
+ for (String topic : topicSet) {
+ ConcurrentHashSet<String> groupSet = topicSubInfoMap.get(topic);
+ if (groupSet == null) {
+ ConcurrentHashSet<String> tmpGroupSet =
+ new ConcurrentHashSet<>();
+ groupSet = topicSubInfoMap.putIfAbsent(topic, tmpGroupSet);
+ if (groupSet == null) {
+ groupSet = tmpGroupSet;
+ }
+ }
+ groupSet.add(groupName);
+ }
}
/**
- * Remove a group from the group set for a specific topic
+ * Remove the group's topic set
*
- * @param topic topic condition
- * @param group group condition
- * @return true if removed, false if not record
+ * @param group the group name which needs removed topic
+ * @param topicSet the topic set which the group name subscribed
*/
- public boolean removeTopicSubInfo(String topic,
- String group) {
- ConcurrentHashSet<String> groupSet = getTopicSubInfo(topic);
- if (groupSet != null) {
- return groupSet.remove(group);
+ public void rmvGroupSubTopicInfo(String group, Set<String> topicSet) {
+ if (topicSet == null || group == null) {
+ return;
+ }
+ ConcurrentHashSet<String> groupSet;
+ for (String topic : topicSet) {
+ if (topic == null) {
+ continue;
+ }
+ groupSet = topicSubInfoMap.get(topic);
+ if (groupSet == null) {
+ continue;
+ }
+ groupSet.remove(group);
}
- return true;
}
/**
@@ -85,30 +102,17 @@ public class TopicPSInfoManager {
* @param topic query topic
* @return target published producerId set
*/
- public ConcurrentHashSet<String> getTopicPubInfo(String topic) {
+ public Set<String> getTopicPubInfo(String topic) {
return topicPubInfoMap.get(topic);
}
/**
- * Set producer IDs for a topic
- *
- * @param topic topic target
- * @param producerIdSet topic produce source
- * @return
- */
- public ConcurrentHashSet<String> setTopicPubInfo(String topic,
- ConcurrentHashSet<String> producerIdSet) {
- return topicPubInfoMap.putIfAbsent(topic, producerIdSet);
- }
-
- /**
* Add producer produce topic set
*
* @param producerId need add producer id
* @param topicList need add topic set
*/
- public void addProducerTopicPubInfo(final String producerId,
- final Set<String> topicList) {
+ public void addProducerTopicPubInfo(String producerId, Set<String> topicList) {
for (String topic : topicList) {
ConcurrentHashSet<String> producerIdSet =
topicPubInfoMap.get(topic);
@@ -121,9 +125,7 @@ public class TopicPSInfoManager {
producerIdSet = tmpProducerIdSet;
}
}
- if (!producerIdSet.contains(producerId)) {
- producerIdSet.add(producerId);
- }
+ producerIdSet.add(producerId);
}
}
@@ -133,8 +135,7 @@ public class TopicPSInfoManager {
* @param producerId need removed producer id
* @param topicList need removed topic set
*/
- public void rmvProducerTopicPubInfo(final String producerId,
- final Set<String> topicList) {
+ public void rmvProducerTopicPubInfo(String producerId, Set<String> topicList) {
if (topicList != null) {
for (String topic : topicList) {
if (topic != null) {
@@ -169,7 +170,7 @@ public class TopicPSInfoManager {
if (subTopicSet.isEmpty()) {
// get all online group
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
- List<String> onlineGroups = consumerHolder.getAllGroup();
+ List<String> onlineGroups = consumerHolder.getAllGroupName();
if (!onlineGroups.isEmpty()) {
if (qryGroupSet.isEmpty()) {
resultSet.addAll(onlineGroups);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
new file mode 100644
index 0000000..0238109
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeGroupInfo {
+ private static final Logger logger =
+ LoggerFactory.getLogger(ConsumeGroupInfo.class);
+ private final String groupName;
+ private final ConsumeType consumeType;
+ private final long createTime; //create time
+ private final Set<String> topicSet = new HashSet<>(); //topic set
+ private final Map<String, TreeSet<String>> topicConditions = //filter condition set
+ new HashMap<>();
+ private final ReadWriteLock csmInfoRWLock = new ReentrantReadWriteLock();
+ private final Map<String, ConsumerInfo> consumerInfoMap = //consumer info
+ new HashMap<>();
+ // session key, the same batch consumer have the same session key
+ private String sessionKey = "";
+ // session start time
+ private long sessionTime = TBaseConstants.META_VALUE_UNDEFINED;
+ // consumer count(specific by client)
+ private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
+ // select the bigger offset when offset conflict
+ private boolean isSelectedBig = true;
+ // allocate offset flag
+ private final AtomicBoolean notAllocate =
+ new AtomicBoolean(true);
+ // current check cycle
+ private final AtomicLong curCheckCycle = new AtomicLong(0);
+ // allocate times
+ private final AtomicInteger allocatedTimes = new AtomicInteger(0);
+ // partition info
+ private final ConcurrentHashMap<String, String> partitionInfoMap =
+ new ConcurrentHashMap<>();
+ // partition offset
+ private final ConcurrentHashMap<String, Long> partOffsetMap =
+ new ConcurrentHashMap<>();
+ // load balance
+ private final ConcurrentHashMap<String, NodeRebInfo> balanceNodeMap =
+ new ConcurrentHashMap<>();
+ // config broker/client ratio
+ private int confResourceRate = TBaseConstants.META_VALUE_UNDEFINED;
+ // current broker/client ratio
+ private int curResourceRate = TBaseConstants.META_VALUE_UNDEFINED;
+ // minimal client count according to above ratio
+ private int minReqClientCnt = TBaseConstants.META_VALUE_UNDEFINED;
+ // rebalance check status
+ private int balanceChkStatus = TBaseConstants.META_VALUE_UNDEFINED;
+ // log print flag
+ private boolean enableBalanceChkPrint = true;
+ //
+ private final AtomicLong csmCtrlId =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final AtomicLong topicMetaInfoId =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final ConcurrentHashMap<String, String> topicMetaInfoMap =
+ new ConcurrentHashMap<>();
+ private final AtomicLong lastMetaInfoFreshTime =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+
+ public ConsumeGroupInfo(ConsumerInfo consumer) {
+ this.groupName = consumer.getGroupName();
+ this.consumeType = consumer.getConsumeType();
+ this.topicSet.addAll(consumer.getTopicSet());
+ this.topicConditions.putAll(consumer.getTopicConditions());
+ this.createTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Add consumer to consume group
+ *
+ * @param inConsumer consumer object
+ */
+ public boolean addConsumer(ConsumerInfo inConsumer,
+ StringBuilder sBuffer,
+ ParamCheckResult result) {
+ try {
+ csmInfoRWLock.writeLock().lock();
+ if (this.consumerInfoMap.isEmpty()) {
+ if (this.consumeType == ConsumeType.CONSUME_BAND) {
+ this.sessionKey = inConsumer.getSessionKey();
+ this.sessionTime = inConsumer.getStartTime();
+ this.sourceCount = inConsumer.getSourceCount();
+ this.isSelectedBig = inConsumer.isSelectedBig();
+ this.curCheckCycle.set(0);
+ } else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+ this.sourceCount = inConsumer.getSourceCount();
+ }
+ } else {
+ if (!validConsumerInfo(inConsumer, sBuffer, result)) {
+ return false;
+ }
+ ConsumerInfo curConsumerInfo =
+ consumerInfoMap.get(inConsumer.getConsumerId());
+ if (curConsumerInfo != null) {
+ curConsumerInfo.updCurConsumerInfo(inConsumer);
+ result.setCheckData("Ok!");
+ return true;
+ }
+ }
+ this.consumerInfoMap.put(inConsumer.getConsumerId(), inConsumer);
+ if (consumeType == ConsumeType.CONSUME_BAND) {
+ bookPartitionInfo(inConsumer);
+ }
+ result.setCheckData("Ok!");
+ return true;
+ } finally {
+ csmInfoRWLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Remove consumer
+ *
+ * @param consumerId consumer id
+ * @return consumer object
+ */
+ public ConsumerInfo removeConsumer(String consumerId) {
+ if (consumerId == null) {
+ return null;
+ }
+ try {
+ csmInfoRWLock.writeLock().lock();
+ this.balanceNodeMap.remove(consumerId);
+ List<String> partKeyList = new ArrayList<>();
+ for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
+ if (entry.getValue() != null) {
+ if (entry.getValue().equals(consumerId)) {
+ partKeyList.add(entry.getKey());
+ }
+ }
+ }
+ for (String partKey : partKeyList) {
+ partitionInfoMap.remove(partKey);
+ partOffsetMap.remove(partKey);
+ }
+ return this.consumerInfoMap.remove(consumerId);
+ } finally {
+ csmInfoRWLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Get consumer group count
+ *
+ * @return group count
+ */
+ public int getGroupCnt() {
+ try {
+ csmInfoRWLock.readLock().lock();
+ return this.consumerInfoMap.size();
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Add node balance info
+ *
+ * @param clientId need add client id
+ * @param waitDuration wait duration
+ */
+ public void addNodeRelInfo(String clientId, int waitDuration) {
+ NodeRebInfo nodeRebInfo = this.balanceNodeMap.get(clientId);
+ if (nodeRebInfo != null) {
+ if (nodeRebInfo.getStatus() == 4) {
+ this.balanceNodeMap.remove(clientId);
+ } else {
+ return;
+ }
+ }
+ try {
+ csmInfoRWLock.readLock().lock();
+ if (consumerInfoMap.containsKey(clientId)) {
+ this.balanceNodeMap.putIfAbsent(clientId,
+ new NodeRebInfo(clientId, waitDuration));
+ }
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ public RebProcessInfo getNeedBalanceNodes() {
+ List<String> needProcessList = new ArrayList<>();
+ List<String> needEscapeList = new ArrayList<>();
+ List<String> needRemoved = new ArrayList<>();
+ for (NodeRebInfo nodeRebInfo : this.balanceNodeMap.values()) {
+ if (nodeRebInfo.getStatus() == 0) {
+ nodeRebInfo.setStatus(1);
+ needProcessList.add(nodeRebInfo.getClientId());
+ } else {
+ if (nodeRebInfo.getReqType() == 1
+ && nodeRebInfo.getStatus() == 2) {
+ if (nodeRebInfo.decrAndGetWaitDuration() <= 0) {
+ nodeRebInfo.setStatus(4);
+ needRemoved.add(nodeRebInfo.getClientId());
+ } else {
+ needEscapeList.add(nodeRebInfo.getClientId());
+ }
+ }
+ }
+ }
+ for (String clientId : needRemoved) {
+ this.balanceNodeMap.remove(clientId);
+ }
+ return new RebProcessInfo(needProcessList, needEscapeList);
+ }
+
+ public void setBalanceNodeProcessed(List<String> processList) {
+ if (processList == null
+ || processList.isEmpty()
+ || this.balanceNodeMap.isEmpty()) {
+ return;
+ }
+ List<String> needRemoved = new ArrayList<>();
+ for (NodeRebInfo nodeRebInfo : this.balanceNodeMap.values()) {
+ if (processList.contains(nodeRebInfo.getClientId())) {
+ if (nodeRebInfo.getReqType() == 0) {
+ nodeRebInfo.setStatus(4);
+ needRemoved.add(nodeRebInfo.getClientId());
+ } else {
+ nodeRebInfo.setStatus(2);
+ }
+ }
+ }
+ for (String clientId : needRemoved) {
+ this.balanceNodeMap.remove(clientId);
+ }
+ }
+
+ public boolean isUnReadyServerBalance() {
+ return (consumeType == ConsumeType.CONSUME_BAND
+ && notAllocate.get()
+ && !partitionInfoMap.isEmpty()
+ && allocatedTimes.get() < 2);
+ }
+
+ public boolean isNotNeedBoundBalance() {
+ return (consumeType != ConsumeType.CONSUME_BAND
+ || !notAllocate.get()
+ || partitionInfoMap.isEmpty()
+ || allocatedTimes.get() >= 2);
+ }
+
+ public boolean needResourceCheck() {
+ return (consumeType == ConsumeType.CONSUME_NORMAL && balanceChkStatus <= 0);
+ }
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+ public boolean isClientBalance() {
+ return consumeType == ConsumeType.CONSUME_CLIENT_REB;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public String getSessionKey() {
+ return sessionKey;
+ }
+
+ public boolean isNotAllocate() {
+ return notAllocate.get();
+ }
+
+ public void settAllocated() {
+ this.notAllocate.compareAndSet(true, false);
+ }
+
+ public int getAllocatedTimes() {
+ return allocatedTimes.get();
+ }
+
+ public void addAllocatedTimes() {
+ this.allocatedTimes.incrementAndGet();
+ }
+
+ public boolean isGroupEmpty() {
+ try {
+ csmInfoRWLock.readLock().lock();
+ return consumerInfoMap.isEmpty();
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ public boolean isGroupFullSize() {
+ try {
+ csmInfoRWLock.readLock().lock();
+ return this.consumerInfoMap.size() >= this.sourceCount;
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ public boolean isBalanceMapEmpty() {
+ return this.balanceNodeMap.isEmpty();
+ }
+
+ public long getSessionTime() {
+ return sessionTime;
+ }
+
+ public int getSourceCount() {
+ return sourceCount;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public long getCurCheckCycle() {
+ return curCheckCycle.get();
+ }
+
+ public long addCurCheckCycle() {
+ return curCheckCycle.addAndGet(1);
+ }
+
+ /**
+ * Set current consumer broker/client ratio
+ *
+ * @param confResourceRate configured broker/client ratio
+ * @param curResourceRate current broker/client ratio
+ * @param minReqClientCnt minimal client count
+ * @param isRebalanced Whether balanced
+ */
+ public void setConsumeResourceInfo(int confResourceRate,
+ int curResourceRate,
+ int minReqClientCnt,
+ boolean isRebalanced) {
+ this.confResourceRate = confResourceRate;
+ this.curResourceRate = curResourceRate;
+ this.minReqClientCnt = minReqClientCnt;
+ this.enableBalanceChkPrint = false;
+ if (isRebalanced) {
+ this.balanceChkStatus = 1;
+ } else {
+ this.balanceChkStatus = 0;
+ }
+ }
+
+ public boolean isEnableBalanceChkPrint() {
+ return enableBalanceChkPrint;
+ }
+
+ public int getConfResourceRate() {
+ return confResourceRate;
+ }
+
+ public int getCurResourceRate() {
+ return curResourceRate;
+ }
+
+ public int getMinReqClientCnt() {
+ return minReqClientCnt;
+ }
+
+ public int getBalanceChkStatus() {
+ return balanceChkStatus;
+ }
+
+ public Set<String> getTopicSet() {
+ return this.topicSet;
+ }
+
+ public boolean isSelectedBig() {
+ return isSelectedBig;
+ }
+
+ public Map<String, TreeSet<String>> getTopicConditions() {
+ return this.topicConditions;
+ }
+
+ public List<ConsumerInfo> getConsumerInfoList() {
+ try {
+ csmInfoRWLock.readLock().lock();
+ return new ArrayList<>(this.consumerInfoMap.values());
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ public List<String> getConsumerIdList() {
+ try {
+ csmInfoRWLock.readLock().lock();
+ return new ArrayList<>(this.consumerInfoMap.keySet());
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ public ConsumerInfo getConsumerInfo(String consumerId) {
+ try {
+ csmInfoRWLock.readLock().lock();
+ return consumerInfoMap.get(consumerId);
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ }
+
+ public List<String> getConsumerViewInfos() {
+ List<String> result = new ArrayList<>();
+ try {
+ csmInfoRWLock.readLock().lock();
+ for (ConsumerInfo consumerInfo : this.consumerInfoMap.values()) {
+ if (consumerInfo == null) {
+ continue;
+ }
+ result.add(consumerInfo.getConsumerViewInfo());
+ }
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ public List<Tuple2<String, Boolean>> getConsumerIdAndTlsInfos() {
+ List<Tuple2<String, Boolean>> result = new ArrayList<>();
+ try {
+ csmInfoRWLock.readLock().lock();
+ for (ConsumerInfo consumerInfo : this.consumerInfoMap.values()) {
+ if (consumerInfo == null) {
+ continue;
+ }
+ result.add(consumerInfo.getConsumerIdAndTlsInfoTuple());
+ }
+ } finally {
+ csmInfoRWLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ public Map<String, String> getPartitionInfoMap() {
+ return this.partitionInfoMap;
+ }
+
+ public Map<String, Long> getPartOffsetMap() {
+ return this.partOffsetMap;
+ }
+
+ public Map<String, NodeRebInfo> getBalanceMap() {
+ return this.balanceNodeMap;
+ }
+
+ /**
+ * book bound report partition information
+ *
+ * @param consumer consumer info
+ */
+ private void bookPartitionInfo(ConsumerInfo consumer) {
+ if (consumeType != ConsumeType.CONSUME_BAND) {
+ return;
+ }
+ Map<String, Long> consumerPartMap = consumer.getRequiredPartition();
+ if (consumerPartMap == null || consumerPartMap.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<String, Long> entry : consumerPartMap.entrySet()) {
+ String oldClientId = this.partitionInfoMap.get(entry.getKey());
+ if (oldClientId == null) {
+ this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+ this.partOffsetMap.put(entry.getKey(), entry.getValue());
+ } else {
+ ConsumerInfo oldConsumerInfo = this.consumerInfoMap.get(oldClientId);
+ if (oldConsumerInfo == null) {
+ this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+ this.partOffsetMap.put(entry.getKey(), entry.getValue());
+ } else {
+ Map<String, Long> oldConsumerPartMap = oldConsumerInfo.getRequiredPartition();
+ if (oldConsumerPartMap == null || oldConsumerPartMap.isEmpty()) {
+ this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+ this.partOffsetMap.put(entry.getKey(), entry.getValue());
+ } else {
+ Long oldConsumerOff = oldConsumerPartMap.get(entry.getKey());
+ if (oldConsumerOff == null) {
+ this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+ this.partOffsetMap.put(entry.getKey(), entry.getValue());
+ } else {
+ if (this.isSelectedBig) {
+ if (entry.getValue() >= oldConsumerOff) {
+ this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+ this.partOffsetMap.put(entry.getKey(), entry.getValue());
+ }
+ } else {
+ if (entry.getValue() < oldConsumerOff) {
+ this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
+ this.partOffsetMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Check the validity of consumer's parameters
+ *
+ * @param inConsumer consumer info
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return true if valid, or false if invalid
+ */
+ private boolean validConsumerInfo(ConsumerInfo inConsumer,
+ StringBuilder sBuffer,
+ ParamCheckResult result) {
+ // check whether the consumer behavior is consistent
+ if (inConsumer.getConsumeType() != this.consumeType) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append(" using ").append(inConsumer.getConsumeType().getName())
+ .append(" subscribe is inconsistency with other consumers using ")
+ .append(this.consumeType.getName())
+ .append(" subscribe in the group");
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ sBuffer.delete(0, sBuffer.length());
+ return false;
+ }
+ // check the topics of consumption
+ if (CollectionUtils.isNotEmpty(topicSet)
+ && (topicSet.size() != inConsumer.getTopicSet().size()
+ || !topicSet.containsAll(inConsumer.getTopicSet()))) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append(" subscribed topics ").append(inConsumer.getTopicSet())
+ .append(" is inconsistency with other consumers in the group, existedTopics: ")
+ .append(topicSet);
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ sBuffer.delete(0, sBuffer.length());
+ return false;
+ }
+ // check the topic conditions of consumption
+ boolean isCondEqual = true;
+ if (topicConditions == null || topicConditions.isEmpty()) {
+ if (!inConsumer.getTopicConditions().isEmpty()) {
+ isCondEqual = false;
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append(" subscribe with filter condition ")
+ .append(inConsumer.getTopicConditions())
+ .append(" is inconsistency with other consumers in the group: topic without conditions");
+ }
+ } else {
+ // check the filter conditions of the topic
+ if (inConsumer.getTopicConditions().isEmpty()) {
+ isCondEqual = false;
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append(" subscribe without filter condition ")
+ .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
+ .append(topicConditions);
+ } else {
+ Set<String> existedCondTopics = topicConditions.keySet();
+ Set<String> reqCondTopics = inConsumer.getTopicConditions().keySet();
+ if (existedCondTopics.size() != reqCondTopics.size()
+ || !existedCondTopics.containsAll(reqCondTopics)) {
+ isCondEqual = false;
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append(" subscribe with filter condition ")
+ .append(inConsumer.getTopicConditions())
+ .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
+ .append(topicConditions);
+ } else {
+ for (String topicKey : existedCondTopics) {
+ if ((topicConditions.get(topicKey).size()
+ != inConsumer.getTopicConditions().get(topicKey).size())
+ || (!topicConditions.get(topicKey).containsAll(
+ inConsumer.getTopicConditions().get(topicKey)))) {
+ isCondEqual = false;
+ sBuffer.append("[Inconsistency subscribe] ")
+ .append(inConsumer.getConsumerId())
+ .append(" subscribe with filter condition ")
+ .append(inConsumer.getTopicConditions())
+ .append(" is inconsistency with other consumers in the group,")
+ .append(" existed topic conditions is ")
+ .append(topicConditions);
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (!isCondEqual) {
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ return false;
+ }
+ // Check the validity of bound consumer's parameters
+ if (this.consumeType == ConsumeType.CONSUME_BAND) {
+ if (!validBoundParameters(inConsumer, sBuffer, result)) {
+ return false;
+ }
+ } else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
+ if (this.sourceCount > 0) {
+ if (this.sourceCount != inConsumer.getSourceCount()) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
+ .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ return false;
+ }
+ boolean foundEqual = false;
+ String occupiedConsumerId = null;
+ for (ConsumerInfo consumerInfo : consumerInfoMap.values()) {
+ if (consumerInfo == null) {
+ continue;
+ }
+ if (consumerInfo.getNodeId() == inConsumer.getNodeId()
+ && !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) {
+ foundEqual = true;
+ occupiedConsumerId = consumerInfo.getConsumerId();
+ break;
+ }
+ }
+ if (foundEqual) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append("'s nodeId value(").append(inConsumer.getNodeId())
+ .append(") is occupied by ").append(occupiedConsumerId)
+ .append(" in the group!");
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ return false;
+ }
+ }
+ }
+ result.setCheckData("Ok");
+ return true;
+ }
+
+ /**
+ * Check the validity of bound consumer's parameters
+ *
+ * @param inConsumer consumer info
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return true if valid, or false if invalid
+ */
+ private boolean validBoundParameters(ConsumerInfo inConsumer,
+ StringBuilder sBuffer,
+ ParamCheckResult result) {
+ if (consumeType != ConsumeType.CONSUME_BAND) {
+ result.setCheckData("");
+ return true;
+ }
+ // If the sessionKey is inconsistent, it means that the previous round of consumption has not completely
+ // exited. In order to avoid the incomplete offset setting, it is necessary to completely clear the above
+ // data before resetting and consuming this round of consumption
+ if (!sessionKey.equals(inConsumer.getSessionKey())) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append("'s sessionKey is inconsistency with other consumers in the group, required is ")
+ .append(sessionKey).append(", request is ").append(inConsumer.getSessionKey());
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_SESSIONKEY, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ return false;
+ }
+ // check the offset config
+ if (isSelectedBig != inConsumer.isSelectedBig()) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append("'s isSelectBig is inconsistency with other consumers in the group, required is ")
+ .append(isSelectedBig).append(", request is ").append(inConsumer.isSelectedBig());
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_SELECTBIG, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ return false;
+ }
+ // check the consumers count
+ if (sourceCount != inConsumer.getSourceCount()) {
+ sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
+ .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
+ result.setCheckResult(false,
+ TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString());
+ logger.warn(sBuffer.toString());
+ return false;
+ }
+ result.setCheckData("Ok");
+ return true;
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeType.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeType.java
new file mode 100644
index 0000000..b390d41
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeType.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+public enum ConsumeType {
+ CONSUME_NORMAL(0, "unbound", "Normal consume without reset offset"),
+ CONSUME_BAND(1, "bound", "Consume data with reset offset"),
+ CONSUME_CLIENT_REB(2, "client-rebalance", "Consume data with client assigned partitions");
+
+ private final int code;
+ private final String name;
+ private final String description;
+
+ ConsumeType(int code, String name, String description) {
+ this.code = code;
+ this.name = name;
+ this.description = description;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public static ConsumeType valueOf(int code) {
+ for (ConsumeType csmType : ConsumeType.values()) {
+ if (csmType.getCode() == code) {
+ return csmType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("unknown ConsumeType code %s", code));
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
deleted file mode 100644
index b8cdda6..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
-
-public class ConsumerBandInfo {
-
- private boolean isBandConsume = false;
-
- //session key, the same batch consumer have the same session key
- private String sessionKey = "";
- private long sessionTime = -1; //session start time
- private int sourceCount = 0; //consumer count(specific by client)
- private boolean isSelectedBig = true; //select the bigger offset when offset conflict
- private AtomicBoolean notAllocate = new AtomicBoolean(true); //allocate offset flag
- private AtomicLong curCheckCycle = new AtomicLong(0); //current check cycle
- private AtomicInteger allocatedTimes = new AtomicInteger(0); //allocate times
- private long createTime = System.currentTimeMillis(); //create time
- private Set<String> topicSet = new HashSet<>(); //topic set
- private Map<String, TreeSet<String>> topicConditions = //filter condition set
- new HashMap<>();
- private ConcurrentHashMap<String, ConsumerInfo> consumerInfoMap = //consumer info
- new ConcurrentHashMap<>();
- private ConcurrentHashMap<String, String> partitionInfoMap = //partition info
- new ConcurrentHashMap<>();
- private ConcurrentHashMap<String, Long> partOffsetMap = //partition offset
- new ConcurrentHashMap<>();
- private ConcurrentHashMap<String, NodeRebInfo> rebalanceMap = //load balance
- new ConcurrentHashMap<>();
- private int defBClientRate = -2; //default broker/client ratio
- private int confBClientRate = -2; //config broker/client ratio
- private int curBClientRate = -2; //current broker/client ratio
- private int minRequireClientCnt = -2; //minimal client count according to above ratio
- private int rebalanceCheckStatus = -2; //rebalance check status
- private boolean rebalanceCheckPrint = true; //log print flag
-
- public ConsumerBandInfo(boolean isSelectedBig) {
- this.sessionKey = "";
- this.sessionTime = -1;
- this.sourceCount = -1;
- this.curCheckCycle.set(0);
- this.allocatedTimes.set(0);
- this.notAllocate.set(true);
- this.isBandConsume = false;
- this.isSelectedBig = isSelectedBig;
- }
-
- public ConsumerBandInfo(boolean isBandConsume, String sessionKey,
- long sessionTime, int sourceCount, long createTime,
- long curCheckCycle, boolean notAllocate, int allocatedTimes,
- boolean isSelectedBig, Set<String> topicSet,
- Map<String, TreeSet<String>> topicConditions,
- Map<String, ConsumerInfo> consumerInfoMap,
- Map<String, String> partitionInfoMap,
- Map<String, Long> partOffsetMap,
- Map<String, NodeRebInfo> rebalanceMap,
- int defBClientRate, int confBClientRate,
- int curBClientRate, int minRequireClientCnt,
- int rebalanceCheckStatus, boolean rebalanceCheckPrint) {
- this.isBandConsume = isBandConsume;
- this.sessionKey = sessionKey;
- this.sessionTime = sessionTime;
- this.sourceCount = sourceCount;
- this.createTime = createTime;
- this.isSelectedBig = isSelectedBig;
- this.notAllocate.set(notAllocate);
- this.allocatedTimes.set(allocatedTimes);
- this.curCheckCycle.set(curCheckCycle);
- this.defBClientRate = defBClientRate;
- this.confBClientRate = confBClientRate;
- this.curBClientRate = curBClientRate;
- this.minRequireClientCnt = minRequireClientCnt;
- this.rebalanceCheckStatus = rebalanceCheckStatus;
- this.rebalanceCheckPrint = rebalanceCheckPrint;
- this.topicSet.addAll(topicSet);
- for (Map.Entry<String, ConsumerInfo> entry : consumerInfoMap.entrySet()) {
- this.consumerInfoMap.put(entry.getKey(), entry.getValue().clone());
- }
- for (Map.Entry<String, TreeSet<String>> entry : topicConditions.entrySet()) {
- this.topicConditions.put(entry.getKey(), entry.getValue());
- }
- for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
- if (entry.getValue() != null) {
- this.partitionInfoMap.put(entry.getKey(), entry.getValue());
- }
- }
- for (Map.Entry<String, Long> entry : partOffsetMap.entrySet()) {
- if (entry.getValue() != null) {
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- }
- }
- if (rebalanceMap != null) {
- for (Map.Entry<String, NodeRebInfo> entry : rebalanceMap.entrySet()) {
- if (entry.getValue() != null) {
- this.rebalanceMap.put(entry.getKey(), entry.getValue().clone());
- }
- }
- }
- }
-
- /**
- * Add consumer
- *
- * @param consumer
- */
- public void addConsumer(ConsumerInfo consumer) {
- if (this.consumerInfoMap.isEmpty()) {
- this.isBandConsume = consumer.isRequireBound();
- if (this.isBandConsume) {
- this.sessionKey = consumer.getSessionKey();
- this.sessionTime = consumer.getStartTime();
- this.sourceCount = consumer.getSourceCount();
- this.createTime = System.currentTimeMillis();
- this.curCheckCycle.set(0);
- }
- this.topicSet.addAll(consumer.getTopicSet());
- for (Map.Entry<String, TreeSet<String>> entry
- : consumer.getTopicConditions().entrySet()) {
- this.topicConditions.put(entry.getKey(), entry.getValue());
- }
- }
- this.consumerInfoMap.put(consumer.getConsumerId(), consumer);
- Map<String, Long> consumerPartMap = consumer.getRequiredPartition();
- if (!this.isBandConsume
- || consumerPartMap == null
- || consumerPartMap.isEmpty()) {
- return;
- }
- for (Map.Entry<String, Long> entry : consumerPartMap.entrySet()) {
- String oldClientId = this.partitionInfoMap.get(entry.getKey());
- if (oldClientId == null) {
- this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- } else {
- ConsumerInfo oldConsumerInfo = this.consumerInfoMap.get(oldClientId);
- if (oldConsumerInfo == null) {
- this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- } else {
- Map<String, Long> oldConsumerPartMap = oldConsumerInfo.getRequiredPartition();
- if (oldConsumerPartMap == null || oldConsumerPartMap.isEmpty()) {
- this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- } else {
- Long oldConsumerOff = oldConsumerPartMap.get(entry.getKey());
- if (oldConsumerOff == null) {
- this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- } else {
- if (this.isSelectedBig) {
- if (entry.getValue() >= oldConsumerOff) {
- this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- }
- } else {
- if (entry.getValue() < oldConsumerOff) {
- this.partitionInfoMap.put(entry.getKey(), consumer.getConsumerId());
- this.partOffsetMap.put(entry.getKey(), entry.getValue());
- }
- }
- }
- }
- }
- }
- }
-
- }
-
- /**
- * Remove consumer
- *
- * @param consumerId
- * @return
- */
- public ConsumerInfo removeConsumer(String consumerId) {
- if (consumerId == null) {
- return null;
- }
- this.rebalanceMap.remove(consumerId);
- List<String> partKeyList = new ArrayList<>();
- for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
- if (entry.getValue() != null) {
- if (entry.getValue().equals(consumerId)) {
- partKeyList.add(entry.getKey());
- }
- }
- }
- for (String partKey : partKeyList) {
- partitionInfoMap.remove(partKey);
- partOffsetMap.remove(partKey);
- }
- return this.consumerInfoMap.remove(consumerId);
- }
-
- public void clear() {
- this.isBandConsume = false;
- this.notAllocate.set(true);
- this.sessionKey = "";
- this.sessionTime = -1;
- this.sourceCount = -1;
- this.createTime = -1;
- this.curCheckCycle.set(0);
- this.allocatedTimes.set(0);
- this.topicSet.clear();
- this.consumerInfoMap.clear();
- this.topicConditions.clear();
- this.partitionInfoMap.clear();
- this.partOffsetMap.clear();
- this.rebalanceMap.clear();
- }
-
- /**
- * Get consumer group count
- *
- * @return group count
- */
- public int getGroupCnt() {
- return this.consumerInfoMap.size();
- }
-
- /**
- * Add node rebalance info
- *
- * @param clientId
- * @param waitDuration
- * @return
- */
- public NodeRebInfo addNodeRelInfo(String clientId, int waitDuration) {
- NodeRebInfo nodeRebInfo = this.rebalanceMap.get(clientId);
- if (nodeRebInfo != null) {
- if (nodeRebInfo.getStatus() == 4) {
- this.rebalanceMap.remove(clientId);
- nodeRebInfo = null;
- } else {
- return nodeRebInfo;
- }
- }
- if (consumerInfoMap.containsKey(clientId)) {
- NodeRebInfo tmpNodeInfo = new NodeRebInfo(clientId, waitDuration);
- nodeRebInfo = this.rebalanceMap.putIfAbsent(clientId, tmpNodeInfo);
- if (nodeRebInfo == null) {
- nodeRebInfo = tmpNodeInfo;
- }
- }
- return nodeRebInfo;
- }
-
- public RebProcessInfo getNeedRebNodeList() {
- List<String> needProcessList = new ArrayList<>();
- List<String> needEscapeList = new ArrayList<>();
- List<String> needRemoved = new ArrayList<>();
- for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
- if (nodeRebInfo.getStatus() == 0) {
- nodeRebInfo.setStatus(1);
- needProcessList.add(nodeRebInfo.getClientId());
- } else {
- if (nodeRebInfo.getReqType() == 1
- && nodeRebInfo.getStatus() == 2) {
- if (nodeRebInfo.decrAndGetWaitDuration() <= 0) {
- nodeRebInfo.setStatus(4);
- needRemoved.add(nodeRebInfo.getClientId());
- } else {
- needEscapeList.add(nodeRebInfo.getClientId());
- }
- }
- }
- }
- for (String clientId : needRemoved) {
- this.rebalanceMap.remove(clientId);
- }
- return new RebProcessInfo(needProcessList, needEscapeList);
- }
-
- public void setRebNodeProcessed(List<String> processList) {
- if (processList == null
- || processList.isEmpty()
- || this.rebalanceMap.isEmpty()) {
- return;
- }
- List<String> needRemoved = new ArrayList<>();
- for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
- if (processList.contains(nodeRebInfo.getClientId())) {
- if (nodeRebInfo.getReqType() == 0) {
- nodeRebInfo.setStatus(4);
- needRemoved.add(nodeRebInfo.getClientId());
- } else {
- nodeRebInfo.setStatus(2);
- }
- }
- }
- for (String clientId : needRemoved) {
- this.rebalanceMap.remove(clientId);
- }
- return;
- }
-
- public boolean isBandConsume() {
- return isBandConsume;
- }
-
- public String getSessionKey() {
- return sessionKey;
- }
-
- public boolean isNotAllocate() {
- return notAllocate.get();
- }
-
- public int getAllocatedTimes() {
- return allocatedTimes.get();
- }
-
- public void addAllocatedTimes() {
- this.allocatedTimes.incrementAndGet();
- }
-
- public boolean isGroupFullSize() {
- return this.consumerInfoMap.size() >= this.sourceCount;
- }
-
- public boolean isRebalanceMapEmpty() {
- return (this.rebalanceMap == null
- || this.rebalanceMap.isEmpty());
- }
-
- public boolean isBandPartsEmpty() {
- return this.partitionInfoMap.isEmpty();
- }
-
- public void settAllocated() {
- this.notAllocate.compareAndSet(true, false);
- }
-
- public long getSessionTime() {
- return sessionTime;
- }
-
- public int getSourceCount() {
- return sourceCount;
- }
-
- public long getCreateTime() {
- return createTime;
- }
-
- public long getCurCheckCycle() {
- return curCheckCycle.get();
- }
-
- public long addCurCheckCycle() {
- return curCheckCycle.addAndGet(1);
- }
-
- /**
- * Set current consumer broker/client ratio
- *
- * @param defBClientRate
- * @param confBClientRate
- * @param curBClientRate
- * @param minRequireClientCnt
- * @param isRebalanced
- */
- public void setCurrConsumeBClientInfo(int defBClientRate,
- int confBClientRate,
- int curBClientRate,
- int minRequireClientCnt,
- boolean isRebalanced) {
- this.defBClientRate = defBClientRate;
- this.confBClientRate = confBClientRate;
- this.curBClientRate = curBClientRate;
- this.minRequireClientCnt = minRequireClientCnt;
- this.rebalanceCheckPrint = false;
- if (isRebalanced) {
- this.rebalanceCheckStatus = 1;
- } else {
- this.rebalanceCheckStatus = 0;
- }
- }
-
- public boolean isRebalanceCheckPrint() {
- return rebalanceCheckPrint;
- }
-
- public AtomicBoolean getNotAllocate() {
- return notAllocate;
- }
-
- public int getDefBClientRate() {
- return defBClientRate;
- }
-
- public int getConfBClientRate() {
- return confBClientRate;
- }
-
- public int getCurBClientRate() {
- return curBClientRate;
- }
-
- public int getMinRequireClientCnt() {
- return minRequireClientCnt;
- }
-
- public int getRebalanceCheckStatus() {
- return rebalanceCheckStatus;
- }
-
- public Set<String> getTopicSet() {
- return this.topicSet;
- }
-
- public boolean isSelectedBig() {
- return isSelectedBig;
- }
-
- public Map<String, TreeSet<String>> getTopicConditions() {
- return this.topicConditions;
- }
-
- public List<ConsumerInfo> getConsumerInfoList() {
- List<ConsumerInfo> result = new ArrayList<>();
- result.addAll(this.consumerInfoMap.values());
- return result;
- }
-
- public List<ConsumerInfo> cloneConsumerInfoList() {
- List<ConsumerInfo> result = new ArrayList<>();
- for (ConsumerInfo consumer : this.consumerInfoMap.values()) {
- if (consumer != null) {
- result.add(consumer.clone());
- }
- }
- return result;
- }
-
- public ConsumerInfo getConsumerInfo(String consumerId) {
- return consumerInfoMap.get(consumerId);
- }
-
- public Map<String, String> getPartitionInfoMap() {
- return this.partitionInfoMap;
- }
-
- public Map<String, Long> getPartOffsetMap() {
- return this.partOffsetMap;
- }
-
- public Map<String, NodeRebInfo> getRebalanceMap() {
- return this.rebalanceMap;
- }
-
- @Override
- public ConsumerBandInfo clone() {
- // no need to deep clone
- return new ConsumerBandInfo(this.isBandConsume, this.sessionKey,
- this.sessionTime, this.sourceCount,
- this.createTime, this.curCheckCycle.get(),
- this.notAllocate.get(), this.allocatedTimes.get(),
- this.isSelectedBig,
- this.topicSet, this.topicConditions,
- this.consumerInfoMap, this.partitionInfoMap,
- this.partOffsetMap, this.rebalanceMap,
- this.defBClientRate, this.confBClientRate,
- this.curBClientRate, this.minRequireClientCnt,
- this.rebalanceCheckStatus, this.rebalanceCheckPrint);
- }
-
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index 9622c91..e3b0046 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.tubemq.corebase.balance.ConsumerEvent;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +89,7 @@ public class ConsumerEventManager {
*/
public ConsumerEvent peek(String consumerId) {
String group =
- consumerHolder.getGroup(consumerId);
+ consumerHolder.getGroupName(consumerId);
if (group != null) {
ConcurrentHashMap<String, LinkedList<ConsumerEvent>> currentEventMap =
hasDisconnectEvent(group)
@@ -119,7 +118,7 @@ public class ConsumerEventManager {
*/
public ConsumerEvent removeFirst(String consumerId) {
ConsumerEvent event = null;
- String group = consumerHolder.getGroup(consumerId);
+ String group = consumerHolder.getGroupName(consumerId);
ConcurrentHashMap<String, LinkedList<ConsumerEvent>> currentEventMap =
hasDisconnectEvent(group) ? disconnectEventMap : connectEventMap;
LinkedList<ConsumerEvent> eventList = currentEventMap.get(consumerId);
@@ -211,16 +210,19 @@ public class ConsumerEventManager {
/**
* Check if disconnect event map have event
*
- * @param group
+ * @param group the group name needs a query
* @return true if disconnect event map not empty otherwise false
*/
public boolean hasDisconnectEvent(String group) {
- List<ConsumerInfo> consumerList =
- consumerHolder.getConsumerList(group);
- if (CollectionUtils.isNotEmpty(consumerList)) {
- for (ConsumerInfo consumer : consumerList) {
+ List<String> consumerIdList =
+ consumerHolder.getConsumerIdList(group);
+ if (CollectionUtils.isNotEmpty(consumerIdList)) {
+ for (String consumerId : consumerIdList) {
+ if (consumerId == null) {
+ continue;
+ }
List<ConsumerEvent> eventList =
- disconnectEventMap.get(consumer.getConsumerId());
+ disconnectEventMap.get(consumerId);
if (eventList != null) {
synchronized (eventList) {
if (CollectionUtils.isNotEmpty(eventList)) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
new file mode 100644
index 0000000..4fbcedf
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.common.utils.ClientSyncInfo;
+
+public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
+
+ private static final long serialVersionUID = 3095734962491009712L;
+
+ private final String consumerId;
+ private final String consumerViewInfo;
+ private final ConsumeType consumeType;
+ private final String group;
+ private final Set<String> topicSet;
+ private final Map<String, TreeSet<String>> topicConditions;
+ private boolean overTLS = false;
+ private long startTime = TBaseConstants.META_VALUE_UNDEFINED;
+ private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
+ private int nodeId = TBaseConstants.META_VALUE_UNDEFINED;
+ // for band consume type node
+ private String sessionKey = "";
+ //select the bigger offset when offset conflict
+ private boolean selectedBig = true;
+ private Map<String, Long> requiredPartition;
+ // for client balance node
+ private long csmFromMaxOffsetCtrlId = TBaseConstants.META_VALUE_UNDEFINED;
+ private long lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
+ private long usedTopicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+
+ public ConsumerInfo(String consumerId, boolean overTLS, String group,
+ Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
+ ConsumeType consumeType, String sessionKey, long startTime,
+ int sourceCount, boolean selectedBig,
+ Map<String, Long> requiredPartition) {
+ this.group = group;
+ this.consumeType = consumeType;
+ this.consumerId = consumerId;
+ this.overTLS = overTLS;
+ this.topicSet = topicSet;
+ if (topicConditions == null) {
+ this.topicConditions = new HashMap<>();
+ } else {
+ this.topicConditions = topicConditions;
+ }
+ this.sessionKey = sessionKey;
+ this.selectedBig = selectedBig;
+ this.startTime = startTime;
+ this.sourceCount = sourceCount;
+ this.requiredPartition = requiredPartition;
+ this.consumerViewInfo = toString();
+ }
+
+ public ConsumerInfo(String consumerId, boolean overTLS, String group,
+ ConsumeType consumeType, int sourceCount, int nodeId,
+ Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
+ long curCsmCtrlId, ClientSyncInfo syncInfo) {
+ this.group = group;
+ this.consumeType = consumeType;
+ this.consumerId = consumerId;
+ this.overTLS = overTLS;
+ this.topicSet = topicSet;
+ if (topicConditions == null) {
+ this.topicConditions = new HashMap<>();
+ } else {
+ this.topicConditions = topicConditions;
+ }
+ this.sourceCount = sourceCount;
+ this.nodeId = nodeId;
+ this.startTime = System.currentTimeMillis();
+ if (curCsmCtrlId != TBaseConstants.META_VALUE_UNDEFINED) {
+ this.csmFromMaxOffsetCtrlId = curCsmCtrlId;
+ }
+ updClientReportInfo(curCsmCtrlId,
+ syncInfo.getLstAssignedTime(), syncInfo.getTopicMetaInfoId());
+ this.consumerViewInfo = toString();
+ }
+
+ public void updCurConsumerInfo(ConsumerInfo inCsmInfo) {
+ if (inCsmInfo.getConsumeType() != ConsumeType.CONSUME_CLIENT_REB) {
+ return;
+ }
+ this.overTLS = inCsmInfo.overTLS;
+ this.nodeId = inCsmInfo.getNodeId();
+ updClientReportInfo(inCsmInfo.getCsmFromMaxOffsetCtrlId(),
+ inCsmInfo.getLstAssignedTime(), inCsmInfo.getUsedTopicMetaInfoId());
+ }
+
+ public void updClientReportInfo(long lstCsmCtrlId,
+ long lastAssignedTime,
+ long usedTopicMetaInfoId) {
+ if (lstCsmCtrlId >= 0 && lstCsmCtrlId != this.csmFromMaxOffsetCtrlId) {
+ this.csmFromMaxOffsetCtrlId = lstCsmCtrlId;
+ }
+ if (lastAssignedTime != TBaseConstants.META_VALUE_UNDEFINED
+ && this.lstAssignedTime != lastAssignedTime) {
+ this.lstAssignedTime = lastAssignedTime;
+ }
+ if (usedTopicMetaInfoId != TBaseConstants.META_VALUE_UNDEFINED
+ && this.usedTopicMetaInfoId != usedTopicMetaInfoId) {
+ this.usedTopicMetaInfoId = usedTopicMetaInfoId;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sBuilder =
+ new StringBuilder(512).append(consumerId)
+ .append("@").append(group).append(":");
+ int count = 0;
+ for (String topicItem : topicSet) {
+ if (count++ > 0) {
+ sBuilder.append(",");
+ }
+ sBuilder.append(topicItem);
+ }
+ sBuilder.append("@overTLS=").append(overTLS);
+ return sBuilder.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ConsumerInfo)) {
+ return false;
+ }
+ final ConsumerInfo info = (ConsumerInfo) obj;
+ return (this.consumerId.equals(info.getConsumerId()));
+ }
+
+ @Override
+ public int compareTo(ConsumerInfo o) {
+ if (!this.consumerId.equals(o.consumerId)) {
+ return this.consumerId.compareTo(o.consumerId);
+ }
+ if (!this.group.equals(o.group)) {
+ return this.group.compareTo(o.group);
+ }
+ return 0;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public String getGroupName() {
+ return group;
+ }
+
+ public Set<String> getTopicSet() {
+ return topicSet;
+ }
+
+ public Map<String, TreeSet<String>> getTopicConditions() {
+ return topicConditions;
+ }
+
+ public boolean isOverTLS() {
+ return overTLS;
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public int getSourceCount() {
+ return this.sourceCount;
+ }
+
+ public int getNodeId() {
+ return this.nodeId;
+ }
+
+ public long getCsmFromMaxOffsetCtrlId() {
+ return csmFromMaxOffsetCtrlId;
+ }
+
+ public boolean isRequireBound() {
+ return this.consumeType == ConsumeType.CONSUME_BAND;
+ }
+
+ public String getSessionKey() {
+ return this.sessionKey;
+ }
+
+ public void setSessionKey(String sessionKey) {
+ this.sessionKey = sessionKey;
+ }
+
+ public Map<String, Long> getRequiredPartition() {
+ return this.requiredPartition;
+ }
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+ public boolean isSelectedBig() {
+ return selectedBig;
+ }
+
+ public String getConsumerViewInfo() {
+ return consumerViewInfo;
+ }
+
+ public long getLstAssignedTime() {
+ return lstAssignedTime;
+ }
+
+ public long getUsedTopicMetaInfoId() {
+ return usedTopicMetaInfoId;
+ }
+
+ public Tuple2<String, Boolean> getConsumerIdAndTlsInfoTuple() {
+ return new Tuple2<>(consumerId, overTLS);
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 995140d..fea5dcb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -17,464 +17,401 @@
package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
+import org.apache.inlong.tubemq.server.common.utils.RowLock;
+import org.apache.inlong.tubemq.server.master.MasterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConsumerInfoHolder {
- private final ConcurrentHashMap<String/* group */, ConsumerBandInfo> groupInfoMap =
+ private static final Logger logger =
+ LoggerFactory.getLogger(ConsumerInfoHolder.class);
+ private final MasterConfig masterConfig; // master configure
+ private final RowLock groupRowLock; //lock
+ private final ConcurrentHashMap<String/* group */, ConsumeGroupInfo> groupInfoMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* consumerId */, String/* group */> consumerIndexMap =
new ConcurrentHashMap<>();
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final ConcurrentHashSet<String/* group */> serverBalanceGroupSet =
+ new ConcurrentHashSet<>();
+ private final ConcurrentHashSet<String/* group */> clientBalanceGroupSet =
+ new ConcurrentHashSet<>();
+
+ public ConsumerInfoHolder(MasterConfig masterConfig) {
+ this.masterConfig = masterConfig;
+ this.groupRowLock = new RowLock("Group-RowLock",
+ this.masterConfig.getRowLockWaitDurMs());
+ }
+
+ public int getDefResourceRate() {
+ return masterConfig.getMaxGroupBrokerConsumeRate();
+ }
+
+ /**
+ * Judge whether the consumer group is empty
+ *
+ * @param groupName group name
+ * @return true: empty, false: not empty
+ */
+ public boolean isConsumeGroupEmpty(String groupName) {
+ if (groupName == null) {
+ return true;
+ }
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(groupName);
+ return (consumeGroupInfo == null || consumeGroupInfo.isGroupEmpty());
+ }
/**
- * Get consumer info list in a group
+ * Get consumer id list in a group
*
* @param group the consumer group name
- * @return a consumer list
+ * @return the consumer id list of the group
*/
- public List<ConsumerInfo> getConsumerList(String group) {
+ public List<String> getConsumerIdList(String group) {
if (group == null) {
return Collections.emptyList();
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- List<ConsumerInfo> oldConsumerList =
- oldConsumeBandInfo.getConsumerInfoList();
- if (oldConsumerList != null) {
- List<ConsumerInfo> consumerList =
- new ArrayList<>(oldConsumerList.size());
- for (ConsumerInfo consumer : oldConsumerList) {
- if (consumer != null) {
- consumerList.add(consumer.clone());
- }
- }
- return consumerList;
- }
- }
- } finally {
- rwLock.readLock().unlock();
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo == null) {
+ return Collections.emptyList();
+ }
+ return consumeGroupInfo.getConsumerIdList();
+ }
+
+ /**
+ * Get the client information of the consumer group
+ *
+ * The query content of this API is the content presented when
+ * the Web API queries the client information.
+ *
+ * @param group the consumer group name
+ * @return the consumer id with subscribed topic and link type of the group
+ */
+ public List<String> getConsumerViewList(String group) {
+ if (group == null) {
+ return Collections.emptyList();
}
- return Collections.emptyList();
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo == null) {
+ return Collections.emptyList();
+ }
+ return consumeGroupInfo.getConsumerViewInfos();
}
/**
- * Get consumer band info
+ * Get the consumerId and tls information of the consumer group
+ *
+ * include consumerId and isOverTLS information.
+ *
+ * @param group the consumer group name
+ * @return the consumer info of the group
+ */
+ public List<Tuple2<String, Boolean>> getConsumerIdAndTlsInfos(String group) {
+ if (group == null) {
+ return Collections.emptyList();
+ }
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo == null) {
+ return Collections.emptyList();
+ }
+ return consumeGroupInfo.getConsumerIdAndTlsInfos();
+ }
+
+ /**
+ * Get consume group information
*
* @param group group name
- * @return a ConsumerBandInfo
+ * @return consume group information
*/
- public ConsumerBandInfo getConsumerBandInfo(String group) {
+ public ConsumeGroupInfo getConsumeGroupInfo(String group) {
if (group == null) {
return null;
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- return oldConsumeBandInfo.clone();
- }
- } finally {
- rwLock.readLock().unlock();
- }
- return null;
+ return groupInfoMap.get(group);
}
/**
* Add current check cycle
*
* @param group group name
- * @return
+ * @return updated check cycle value
*/
public Long addCurCheckCycle(String group) {
if (group == null) {
return null;
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- return oldConsumeBandInfo.addCurCheckCycle();
- }
- } finally {
- rwLock.readLock().unlock();
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ return consumeGroupInfo.addCurCheckCycle();
}
return null;
}
/**
- * Set current broker/client ratio
+ * get subscribed topic set of group
*
- * @param group group name
- * @param defBClientRate default broker/client ratio
- * @param confBClientRate config broker/client ratio
- * @param curBClientRate current broker/client ratio
- * @param minRequireClientCnt minimal client count
- * @param isRebalanced if need re-balance
+ * @param group group name
+ * @return subscribed topic set
*/
- public void setCurConsumeBClientInfo(String group, int defBClientRate,
- int confBClientRate, int curBClientRate,
- int minRequireClientCnt, boolean isRebalanced) {
+ public Set<String> getGroupTopicSet(String group) {
if (group == null) {
- return;
+ return Collections.emptySet();
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- oldConsumeBandInfo
- .setCurrConsumeBClientInfo(defBClientRate,
- confBClientRate, curBClientRate,
- minRequireClientCnt, isRebalanced);
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ return consumeGroupInfo.getTopicSet();
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * get current consumer count of group
+ *
+ * @param group group name
+ * @return consumer count
+ */
+ public int getConsumerCnt(String group) {
+ int count = 0;
+ if (group != null) {
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ count = consumeGroupInfo.getGroupCnt();
}
- } finally {
- rwLock.readLock().unlock();
}
- return;
+ return count;
}
/**
- * Add allocate time
+ * get need rebalanced consumer of group
*
* @param group group name
+ * @return need rebalanced consumer
*/
- public void addAllocatedTimes(String group) {
+ public RebProcessInfo getNeedRebNodeList(String group) {
+ RebProcessInfo rebProcessInfo = new RebProcessInfo();
if (group == null) {
- return;
+ return rebProcessInfo;
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- oldConsumeBandInfo.addAllocatedTimes();
- }
- } finally {
- rwLock.readLock().unlock();
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ rebProcessInfo = consumeGroupInfo.getNeedBalanceNodes();
}
- return;
+ return rebProcessInfo;
}
/**
- * Set allocated value
+ * set rebalanced consumer id of group
*
* @param group group name
+ * @param processList rebalanced consumer id
*/
- public void setAllocated(String group) {
+ public void setRebNodeProcessed(String group,
+ List<String> processList) {
if (group == null) {
return;
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- oldConsumeBandInfo.settAllocated();
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ consumeGroupInfo.setBalanceNodeProcessed(processList);
+ }
+ }
+
+ /**
+ * booked need rebalance consumer of group
+ *
+ * @param group group name
+ * @param consumerIdSet need rebalance consumerId
+ * @param waitDuration wait duration
+ */
+ public void addRebConsumerInfo(String group, Set<String> consumerIdSet, int waitDuration) {
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ for (String consumerId : consumerIdSet) {
+ String oldGroup = consumerIndexMap.get(consumerId);
+ if (group.equals(oldGroup)) {
+ consumeGroupInfo.addNodeRelInfo(consumerId, waitDuration);
+ }
}
- } finally {
- rwLock.readLock().unlock();
}
- return;
}
/**
* Check if allocated
*
* @param group group name
- * @return if not allocated
+ * @return allocate status
*/
public boolean isNotAllocated(String group) {
if (group == null) {
return false;
}
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- return oldConsumeBandInfo.isNotAllocate();
- }
- } finally {
- rwLock.readLock().unlock();
+ ConsumeGroupInfo consumeGroupInfo =
+ groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ return consumeGroupInfo.isNotAllocate();
}
return false;
}
- public Set<String> getGroupTopicSet(String group) {
- if (group != null) {
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- return oldConsumeBandInfo.getTopicSet();
- }
- } finally {
- rwLock.readLock().unlock();
- }
+ /**
+ * get group name of consumer
+ *
+ * @param consumerId consumer id
+ * @return the group name of consumer
+ */
+ public String getGroupName(String consumerId) {
+ return consumerIndexMap.get(consumerId);
+ }
+
+ /**
+ * get all registered group name
+ *
+ * @return the group name registered
+ */
+ public List<String> getAllGroupName() {
+ if (groupInfoMap.isEmpty()) {
+ return Collections.emptyList();
}
- return Collections.emptySet();
+ return new ArrayList<>(groupInfoMap.keySet());
}
/**
- * Get the group's subscribed topics and ConsumerInfos.
+ * get all server-balance group name
*
- * @param group group to be queried
- * @param result query result, only read
- * @return Has the data been found
+ * @return the group name registered
*/
- public boolean getGroupTopicSetAndConsumerInfos(
- String group, Tuple2<Set<String>, List<ConsumerInfo>> result) {
- if (group != null) {
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo curGroupInfo =
- groupInfoMap.get(group);
- if (curGroupInfo != null) {
- result.setF0AndF1(curGroupInfo.getTopicSet(),
- curGroupInfo.cloneConsumerInfoList());
- return true;
- }
- } finally {
- rwLock.readLock().unlock();
- }
+ public List<String> getAllServerBalanceGroups() {
+ if (serverBalanceGroupSet.isEmpty()) {
+ return Collections.emptyList();
}
- result.setF0AndF1(Collections.emptySet(), Collections.emptyList());
- return false;
+ return new ArrayList<>(serverBalanceGroupSet);
}
/**
- * Get the group's subscribed topics and client count.
+ * get all client-balance group name
*
- * @param group group to be queried
- * @param result query result, only read
- * @return Has the data been found
+ * @return the group name registered
*/
- public boolean getGroupTopicSetAndClientCnt(String group,
- Tuple2<Set<String>, Integer> result) {
- if (group != null) {
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo curGroupInfo =
- groupInfoMap.get(group);
- if (curGroupInfo != null) {
- result.setF0AndF1(curGroupInfo.getTopicSet(),
- curGroupInfo.getGroupCnt());
- return true;
- }
- } finally {
- rwLock.readLock().unlock();
- }
+ public List<String> getAllClientBalanceGroups() {
+ if (clientBalanceGroupSet.isEmpty()) {
+ return Collections.emptyList();
}
- result.setF0AndF1(Collections.emptySet(), 0);
- return false;
+ return new ArrayList<>(clientBalanceGroupSet);
}
/**
- * Add a consumer into consumer band info, if consumer band info not exist, will create a new one
+ * get all registerd group name
*
- * @param consumer consumer info
- * @param isNotAllocated if not allocated
- * @param isSelectedBig select big data
- * @return a ConsumerBandInfo
+ * @param consumerId the consumer id
+ * @return the consumer info
*/
- public ConsumerBandInfo addConsumer(ConsumerInfo consumer,
- boolean isNotAllocated,
- boolean isSelectedBig) {
- ConsumerBandInfo consumeBandInfo = null;
- String group = consumer.getGroup();
- try {
- rwLock.writeLock().lock();
- consumeBandInfo = groupInfoMap.get(group);
- if (consumeBandInfo == null) {
- ConsumerBandInfo tmpBandInfo =
- new ConsumerBandInfo(isSelectedBig);
- consumeBandInfo =
- groupInfoMap.putIfAbsent(group, tmpBandInfo);
- if (consumeBandInfo == null) {
- consumeBandInfo = tmpBandInfo;
- }
- }
- consumeBandInfo.addConsumer(consumer);
- if (!isNotAllocated) {
- consumeBandInfo.settAllocated();
+ public ConsumerInfo getConsumerInfo(String consumerId) {
+ ConsumerInfo consumerInfo = null;
+ String groupName = consumerIndexMap.get(consumerId);
+ if (groupName != null) {
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(groupName);
+ if (consumeGroupInfo != null) {
+ consumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
}
- consumerIndexMap.put(consumer.getConsumerId(), group);
- } finally {
- rwLock.writeLock().unlock();
}
- return consumeBandInfo;
+ return consumerInfo;
}
- public void addRebConsumerInfo(String group,
- Set<String> consumerIdSet,
- int waitDuration) {
+ /**
+ * Add consumer and return group object,
+ * if the consumer is the first one, then create the group object
+ *
+ * @param consumer consumer info
+ * @param isNotAllocated whether balanced
+ * @param sBuffer string buffer
+ * @param result check result
+ * @return process result
+ */
+ public boolean addConsumer(ConsumerInfo consumer, boolean isNotAllocated,
+ StringBuilder sBuffer, ParamCheckResult result) {
+ ConsumeGroupInfo consumeGroupInfo = null;
+ String group = consumer.getGroupName();
+ Integer lid = null;
try {
- rwLock.readLock().lock();
- ConsumerBandInfo consumeBandInfo =
- groupInfoMap.get(group);
- if (consumeBandInfo != null) {
- for (String consumerId : consumerIdSet) {
- String oldGroup = consumerIndexMap.get(consumerId);
- if (group.equals(oldGroup)) {
- consumeBandInfo.addNodeRelInfo(consumerId, waitDuration);
+ lid = groupRowLock.getLock(null,
+ StringUtils.getBytesUtf8(group), true);
+ consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo == null) {
+ ConsumeGroupInfo tmpGroupInfo = new ConsumeGroupInfo(consumer);
+ consumeGroupInfo = groupInfoMap.putIfAbsent(group, tmpGroupInfo);
+ if (consumeGroupInfo == null) {
+ consumeGroupInfo = tmpGroupInfo;
+ if (tmpGroupInfo.isClientBalance()) {
+ clientBalanceGroupSet.add(group);
+ } else {
+ serverBalanceGroupSet.add(group);
}
}
}
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public RebProcessInfo getNeedRebNodeList(String group) {
- RebProcessInfo rebProcessInfo = new RebProcessInfo();
- if (group == null) {
- return rebProcessInfo;
- }
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo consumeBandInfo =
- groupInfoMap.get(group);
- if (consumeBandInfo != null) {
- rebProcessInfo = consumeBandInfo.getNeedRebNodeList();
+ if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
+ if (!isNotAllocated) {
+ consumeGroupInfo.settAllocated();
+ }
+ consumerIndexMap.put(consumer.getConsumerId(), group);
+ result.setCheckData(consumeGroupInfo);
}
+ } catch (IOException e) {
+ logger.warn("Failed to lock.", e);
} finally {
- rwLock.readLock().unlock();
- }
- return rebProcessInfo;
- }
-
- public void setRebNodeProcessed(String group,
- List<String> processList) {
- if (group == null) {
- return;
- }
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo consumeBandInfo =
- groupInfoMap.get(group);
- if (consumeBandInfo != null) {
- consumeBandInfo.setRebNodeProcessed(processList);
+ if (lid != null) {
+ groupRowLock.releaseRowLock(lid);
}
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public boolean exist(String consumerId) {
- boolean isExist = false;
- try {
- rwLock.readLock().lock();
- isExist = consumerIndexMap.containsKey(consumerId);
- } finally {
- rwLock.readLock().unlock();
}
- return isExist;
+ return result.result;
}
- public String getGroup(String consumerId) {
- String groupName = null;
- try {
- rwLock.readLock().lock();
- groupName = consumerIndexMap.get(consumerId);
- } finally {
- rwLock.readLock().unlock();
- }
- return groupName;
- }
-
- public ConsumerInfo removeConsumer(String group,
- String consumerId) {
- if (group == null
- || consumerId == null) {
+ /**
+ * remove the consumer and return consumer object,
+ * if the consumer is the latest one, then removed the group object
+ *
+ * @param group group name of consumer
+ * @param consumerId consumer id
+ * @return ConsumerInfo
+ */
+ public ConsumerInfo removeConsumer(String group, String consumerId) {
+ if (group == null || consumerId == null) {
return null;
}
ConsumerInfo consumer = null;
+ Integer lid = null;
try {
- rwLock.writeLock().lock();
- ConsumerBandInfo consumeBandInfo =
- groupInfoMap.get(group);
- if (consumeBandInfo != null) {
- consumer = consumeBandInfo.removeConsumer(consumerId);
- if (consumeBandInfo.getGroupCnt() == 0) {
+ lid = groupRowLock.getLock(null,
+ StringUtils.getBytesUtf8(group), true);
+ ConsumeGroupInfo consumeGroupInfo = groupInfoMap.get(group);
+ if (consumeGroupInfo != null) {
+ consumer = consumeGroupInfo.removeConsumer(consumerId);
+ if (consumeGroupInfo.isGroupEmpty()) {
groupInfoMap.remove(group);
+ if (consumeGroupInfo.isClientBalance()) {
+ clientBalanceGroupSet.add(group);
+ } else {
+ serverBalanceGroupSet.add(group);
+ }
}
}
consumerIndexMap.remove(consumerId);
+ } catch (IOException e) {
+ logger.warn("Failed to lock.", e);
} finally {
- rwLock.writeLock().unlock();
- }
- return consumer;
- }
-
- public Tuple2<String, ConsumerInfo> getConsumeTupleInfo(String consumerId) {
- try {
- rwLock.readLock().lock();
- ConsumerInfo consumerInfo = null;
- String groupName = consumerIndexMap.get(consumerId);
- if (groupName != null) {
- ConsumerBandInfo consumeBandInfo = groupInfoMap.get(groupName);
- if (consumeBandInfo != null) {
- consumerInfo = consumeBandInfo.getConsumerInfo(consumerId);
- }
+ if (lid != null) {
+ groupRowLock.releaseRowLock(lid);
}
- return new Tuple2<>(groupName, consumerInfo);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public List<String> getAllGroup() {
- try {
- rwLock.readLock().lock();
- if (groupInfoMap.isEmpty()) {
- return Collections.emptyList();
- } else {
- List<String> groupList =
- new ArrayList<>(groupInfoMap.size());
- groupList.addAll(groupInfoMap.keySet());
- return groupList;
- }
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- public int getConsumerCnt(String group) {
- int count = 0;
- if (group == null) {
- return 0;
- }
- try {
- rwLock.readLock().lock();
- ConsumerBandInfo oldConsumeBandInfo =
- groupInfoMap.get(group);
- if (oldConsumeBandInfo != null) {
- count = oldConsumeBandInfo.getGroupCnt();
- }
- } finally {
- rwLock.readLock().unlock();
}
- return count;
- }
-
- public void clear() {
- consumerIndexMap.clear();
- groupInfoMap.clear();
+ return consumer;
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
new file mode 100644
index 0000000..766af32
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
+
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+
+public class TopicConfigInfo implements Comparable<TopicConfigInfo> {
+ private final int brokerId;
+ private final String topicName;
+ private final int numStore;
+ private final int numPart;
+ private boolean acceptSub;
+
+ public TopicConfigInfo(TopicDeployEntity topicDeployEntity) {
+ this.brokerId = topicDeployEntity.getBrokerId();
+ this.topicName = topicDeployEntity.getTopicName();
+ this.numStore = topicDeployEntity.getNumTopicStores();
+ this.numPart = topicDeployEntity.getNumPartitions();
+ this.acceptSub = false;
+ }
+
+ public int getBrokerId() {
+ return brokerId;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public int getNumStore() {
+ return numStore;
+ }
+
+ public void setAcceptSub(boolean acceptSub) {
+ this.acceptSub = acceptSub;
+ }
+
+ public boolean isAcceptSub() {
+ return acceptSub;
+ }
+
+ public int getNumPart() {
+ return numPart;
+ }
+
+ @Override
+ public int compareTo(TopicConfigInfo o) {
+ return Integer.compare(this.brokerId, o.brokerId);
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java
index 4aa633a..e5d6545 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/action/screen/Master.java
@@ -23,13 +23,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
-import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.corerpc.exception.StandbyException;
@@ -98,21 +97,26 @@ public class Master implements Action {
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
String group = req.getParameter("group");
if (group != null) {
- List<ConsumerInfo> consumerList = consumerHolder.getConsumerList(group);
int index = 1;
- if (consumerList != null && !consumerList.isEmpty()) {
- Collections.sort(consumerList);
- for (ConsumerInfo consumer : consumerList) {
- sBuilder.append(index).append(". ").append(consumer.toString()).append("\n");
- index++;
- }
- } else {
- sBuilder.append("No such group.\n\nCurrent all groups");
- List<String> groupList = consumerHolder.getAllGroup();
- sBuilder.append("(").append(groupList.size()).append("):\n");
+ List<String> consumerViewInfos =
+ consumerHolder.getConsumerViewList(group);
+ if (CollectionUtils.isEmpty(consumerViewInfos)) {
+ List<String> groupList = consumerHolder.getAllGroupName();
+ sBuilder.append("No such group.\n\nCurrent all groups(")
+ .append(groupList.size()).append("):\n");
for (String currGroup : groupList) {
sBuilder.append(currGroup).append("\n");
}
+ } else {
+ Collections.sort(consumerViewInfos);
+ for (String consumerViewInfo : consumerViewInfos) {
+ if (consumerViewInfo == null) {
+ continue;
+ }
+ sBuilder.append(index).append(". ")
+ .append(consumerViewInfo).append("\n");
+ index++;
+ }
}
}
}
@@ -128,20 +132,27 @@ public class Master implements Action {
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
String group = req.getParameter("group");
if (group != null) {
- List<ConsumerInfo> consumerList = consumerHolder.getConsumerList(group);
- if (consumerList != null && !consumerList.isEmpty()) {
- Collections.sort(consumerList);
+ List<Tuple2<String, Boolean>> consumerList =
+ consumerHolder.getConsumerIdAndTlsInfos(group);
+ if (CollectionUtils.isEmpty(consumerList)) {
+ List<String> groupList = consumerHolder.getAllGroupName();
+ sBuilder.append("No such group.\n\nCurrent all group(")
+ .append(groupList.size()).append("):\n");
+ for (String currGroup : groupList) {
+ sBuilder.append(currGroup).append("\n");
+ }
+ } else {
sBuilder.append("\n########################## Subscribe Relationship ############################\n\n");
Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
master.getCurrentSubInfoMap();
for (int i = 0; i < consumerList.size(); i++) {
- ConsumerInfo consumer = consumerList.get(i);
+ Tuple2<String, Boolean> consumer = consumerList.get(i);
sBuilder.append("*************** ").append(i + 1)
- .append(". ").append(consumer.getConsumerId())
- .append("#isOverTLS=").append(consumer.isOverTLS())
+ .append(". ").append(consumer.getF0())
+ .append("#isOverTLS=").append(consumer.getF1())
.append(" ***************");
Map<String, Map<String, Partition>> topicSubMap =
- currentSubInfoMap.get(consumer.getConsumerId());
+ currentSubInfoMap.get(consumer.getF0());
if (topicSubMap != null) {
int totalSize = 0;
for (Map.Entry<String, Map<String, Partition>> entry : topicSubMap.entrySet()) {
@@ -152,7 +163,7 @@ public class Master implements Action {
Map<String, Partition> partMap = entry.getValue();
if (partMap != null) {
for (Partition part : partMap.values()) {
- sBuilder.append(consumer.getConsumerId())
+ sBuilder.append(consumer.getF0())
.append("#").append(part.toString()).append("\n");
}
}
@@ -160,13 +171,6 @@ public class Master implements Action {
}
sBuilder.append("\n\n");
}
- } else {
- sBuilder.append("No such group.\n\nCurrent all group");
- List<String> groupList = consumerHolder.getAllGroup();
- sBuilder.append("(").append(groupList.size()).append("):\n");
- for (String currGroup : groupList) {
- sBuilder.append(currGroup).append("\n");
- }
}
}
}
@@ -192,7 +196,7 @@ public class Master implements Action {
if (topic != null) {
TopicPSInfoManager topicPSInfoManager =
master.getTopicPSInfoManager();
- ConcurrentHashSet<String> producerSet =
+ Set<String> producerSet =
topicPSInfoManager.getTopicPubInfo(topic);
if (producerSet != null && !producerSet.isEmpty()) {
int index = 1;
@@ -279,7 +283,8 @@ public class Master implements Action {
*/
private void getTopicPubInfo(final HttpServletRequest req, StringBuilder sBuilder) {
String topic = req.getParameter("topic");
- Set<String> producerIds = master.getTopicPSInfoManager().getTopicPubInfo(topic);
+ Set<String> producerIds =
+ master.getTopicPSInfoManager().getTopicPubInfo(topic);
if (producerIds != null && !producerIds.isEmpty()) {
for (String producerId : producerIds) {
sBuilder.append(producerId).append("\n");
@@ -301,27 +306,38 @@ public class Master implements Action {
Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
master.getCurrentSubInfoMap();
int currPartSize = 0;
- List<String> groupList = consumerHolder.getAllGroup();
- Tuple2<Set<String>, List<ConsumerInfo>> queryInfo = new Tuple2<>();
+ Set<String> topicSet;
+ List<Partition> partList;
+ List<String> consumerIdList;
+ Map<String, Partition> topicSubInfoMap;
+ Map<String, Map<String, Partition>> consumerSubInfoMap;
+ List<String> groupList = consumerHolder.getAllServerBalanceGroups();
for (String group : groupList) {
- if (!consumerHolder.getGroupTopicSetAndConsumerInfos(group, queryInfo)) {
+ if (group == null) {
continue;
}
- for (String topic : queryInfo.getF0()) {
+ topicSet = consumerHolder.getGroupTopicSet(group);
+ for (String topic : topicSet) {
+ if (topic == null) {
+ continue;
+ }
currPartSize = 0;
- for (ConsumerInfo consumer : queryInfo.getF1()) {
- Map<String, Map<String, Partition>> consumerSubInfoMap =
- currentSubInfoMap.get(consumer.getConsumerId());
- if (consumerSubInfoMap != null) {
- Map<String, Partition> topicSubInfoMap =
- consumerSubInfoMap.get(topic);
- if (topicSubInfoMap != null) {
- currPartSize += topicSubInfoMap.size();
+ consumerIdList = consumerHolder.getConsumerIdList(group);
+ if (CollectionUtils.isNotEmpty(consumerIdList)) {
+ for (String consumerId : consumerIdList) {
+ if (consumerId == null) {
+ continue;
+ }
+ consumerSubInfoMap = currentSubInfoMap.get(consumerId);
+ if (consumerSubInfoMap != null) {
+ topicSubInfoMap = consumerSubInfoMap.get(topic);
+ if (topicSubInfoMap != null) {
+ currPartSize += topicSubInfoMap.size();
+ }
}
}
}
- List<Partition> partList =
- brokerRunManager.getSubBrokerAcceptSubParts(topic);
+ partList = brokerRunManager.getSubBrokerAcceptSubParts(topic);
if (currPartSize != partList.size()) {
sBuilder.append(group).append(":").append(topic).append("\n");
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 7d628cd..7aec03b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -32,7 +32,7 @@ import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
import org.slf4j.Logger;
@@ -724,16 +724,16 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler {
Set<String> consumerIdSet = (Set<String>) result.getRetData();
ConsumerInfoHolder consumerInfoHolder =
master.getConsumerHolder();
- ConsumerBandInfo consumerBandInfo =
- consumerInfoHolder.getConsumerBandInfo(groupName);
- if (consumerBandInfo == null) {
+ ConsumeGroupInfo consumeGroupInfo =
+ consumerInfoHolder.getConsumeGroupInfo(groupName);
+ if (consumeGroupInfo == null) {
String errInfo = sBuffer.append("The group(")
.append(groupName).append(") not online!").toString();
sBuffer.delete(0, sBuffer.length());
WebParameterUtils.buildFailResult(sBuffer, errInfo);
return sBuffer;
}
- Map<String, NodeRebInfo> nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
+ Map<String, NodeRebInfo> nodeRebInfoMap = consumeGroupInfo.getBalanceMap();
for (String consumerId : consumerIdSet) {
if (nodeRebInfoMap.containsKey(consumerId)) {
String errInfo = sBuffer.append("Duplicated set for consumerId(")
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
index bf7a3cb..10791a0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java
@@ -27,15 +27,16 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
-import org.apache.inlong.tubemq.corebase.cluster.ConsumerInfo;
+
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
-import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
-import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeGroupInfo;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumeType;
+import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfo;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
@@ -96,25 +97,23 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
topicPSInfoManager.getGroupSetWithSubTopic(inGroupNameSet, topicNameSet);
int totalCnt = 0;
int topicCnt = 0;
- Tuple2<Set<String>, Integer> queryInfo = new Tuple2<>();
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (String group : queryGroupSet) {
- if (!consumerHolder.getGroupTopicSetAndClientCnt(group, queryInfo)) {
- continue;
- }
if (totalCnt++ > 0) {
sBuffer.append(",");
}
sBuffer.append("{\"consumeGroup\":\"").append(group).append("\",\"topicSet\":[");
topicCnt = 0;
- for (String tmpTopic : queryInfo.getF0()) {
+ Set<String> topicSet = consumerHolder.getGroupTopicSet(group);
+ int consumerCnt = consumerHolder.getConsumerCnt(group);
+ for (String tmpTopic : topicSet) {
if (topicCnt++ > 0) {
sBuffer.append(",");
}
sBuffer.append("\"").append(tmpTopic).append("\"");
}
- sBuffer.append("],\"consumerNum\":").append(queryInfo.getF1()).append("}");
+ sBuffer.append("],\"consumerNum\":").append(consumerCnt).append("}");
}
WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
return sBuffer;
@@ -143,7 +142,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
}
String strConsumeGroup = (String) result.getRetData();
try {
- boolean isBandConsume = false;
+ ConsumeType consumeType = ConsumeType.CONSUME_NORMAL;
boolean isNotAllocate = false;
boolean isSelectBig = true;
String sessionKey = "";
@@ -154,37 +153,37 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
int confBClientRate = -2;
int curBClientRate = -2;
int minRequireClientCnt = -2;
- int rebalanceStatus = -2;
+ int balanceStatus = -2;
Set<String> topicSet = new HashSet<>();
List<ConsumerInfo> consumerList = new ArrayList<>();
Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<>();
Map<String, TreeSet<String>> existedTopicConditions = new HashMap<>();
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
- ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(strConsumeGroup);
- if (consumerBandInfo != null) {
- if (consumerBandInfo.getTopicSet() != null) {
- topicSet = consumerBandInfo.getTopicSet();
+ ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(strConsumeGroup);
+ if (consumeGroupInfo != null) {
+ if (consumeGroupInfo.getTopicSet() != null) {
+ topicSet = consumeGroupInfo.getTopicSet();
}
- if (consumerBandInfo.getConsumerInfoList() != null) {
- consumerList = consumerBandInfo.getConsumerInfoList();
+ if (consumeGroupInfo.getConsumerInfoList() != null) {
+ consumerList = consumeGroupInfo.getConsumerInfoList();
}
- if (consumerBandInfo.getTopicConditions() != null) {
- existedTopicConditions = consumerBandInfo.getTopicConditions();
+ if (consumeGroupInfo.getTopicConditions() != null) {
+ existedTopicConditions = consumeGroupInfo.getTopicConditions();
}
- nodeRebInfoMap = consumerBandInfo.getRebalanceMap();
- isBandConsume = consumerBandInfo.isBandConsume();
- rebalanceStatus = consumerBandInfo.getRebalanceCheckStatus();
- defBClientRate = consumerBandInfo.getDefBClientRate();
- confBClientRate = consumerBandInfo.getConfBClientRate();
- curBClientRate = consumerBandInfo.getCurBClientRate();
- minRequireClientCnt = consumerBandInfo.getMinRequireClientCnt();
- if (isBandConsume) {
- isNotAllocate = consumerBandInfo.isNotAllocate();
- isSelectBig = consumerBandInfo.isSelectedBig();
- sessionKey = consumerBandInfo.getSessionKey();
- reqSourceCount = consumerBandInfo.getSourceCount();
- curSourceCount = consumerBandInfo.getGroupCnt();
- rebalanceCheckTime = consumerBandInfo.getCurCheckCycle();
+ nodeRebInfoMap = consumeGroupInfo.getBalanceMap();
+ consumeType = consumeGroupInfo.getConsumeType();
+ balanceStatus = consumeGroupInfo.getBalanceChkStatus();
+ defBClientRate = consumerHolder.getDefResourceRate();
+ confBClientRate = consumeGroupInfo.getConfResourceRate();
+ curBClientRate = consumeGroupInfo.getCurResourceRate();
+ minRequireClientCnt = consumeGroupInfo.getMinReqClientCnt();
+ if (consumeType == ConsumeType.CONSUME_BAND) {
+ isNotAllocate = consumeGroupInfo.isNotAllocate();
+ isSelectBig = consumeGroupInfo.isSelectedBig();
+ sessionKey = consumeGroupInfo.getSessionKey();
+ reqSourceCount = consumeGroupInfo.getSourceCount();
+ curSourceCount = consumeGroupInfo.getGroupCnt();
+ rebalanceCheckTime = consumeGroupInfo.getCurCheckCycle();
}
}
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"")
@@ -205,9 +204,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
sBuffer.append("\"").append(entry.getKey()).append("\":");
sBuffer = entry.getValue().toJsonString(sBuffer);
}
- sBuffer.append("},\"isBandConsume\":").append(isBandConsume);
+ sBuffer.append("},\"isBandConsume\":\"").append(consumeType.getName()).append("\"");
// Append band consume info
- if (isBandConsume) {
+ if (consumeType == ConsumeType.CONSUME_BAND) {
sBuffer.append(",\"isNotAllocate\":").append(isNotAllocate)
.append(",\"sessionKey\":\"").append(sessionKey)
.append("\",\"isSelectBig\":").append(isSelectBig)
@@ -216,9 +215,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
.append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime);
}
sBuffer.append(",\"rebInfo\":{");
- if (rebalanceStatus == -2) {
+ if (balanceStatus == -2) {
sBuffer.append("\"isRebalanced\":false");
- } else if (rebalanceStatus == 0) {
+ } else if (balanceStatus == 0) {
sBuffer.append("\"isRebalanced\":true,\"checkPasted\":false")
.append(",\"defBClientRate\":").append(defBClientRate)
.append(",\"confBClientRate\":").append(confBClientRate)
@@ -252,7 +251,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
}
sBuffer.append("}");
// Append consumer info of the group
- getConsumerInfoList(consumerList, isBandConsume, sBuffer);
+ getConsumerInfoList(consumerList, consumeType, sBuffer);
sBuffer.append("}");
} catch (Exception e) {
sBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
@@ -265,11 +264,11 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
* Private method to append consumer info of the give list to a string builder
*
* @param consumerList consumer list
- * @param isBandConsume whether bound consume
+ * @param consumeType consume type
* @param strBuffer string buffer
*/
private void getConsumerInfoList(final List<ConsumerInfo> consumerList,
- boolean isBandConsume, final StringBuilder strBuffer) {
+ ConsumeType consumeType, final StringBuilder strBuffer) {
strBuffer.append(",\"data\":[");
if (!consumerList.isEmpty()) {
Collections.sort(consumerList);
@@ -285,7 +284,7 @@ public class WebOtherInfoHandler extends AbstractWebHandler {
}
strBuffer.append("{\"consumerId\":\"").append(consumer.getConsumerId())
.append("\"").append(",\"isOverTLS\":").append(consumer.isOverTLS());
- if (isBandConsume) {
+ if (consumeType == ConsumeType.CONSUME_BAND) {
Map<String, Long> requiredPartition = consumer.getRequiredPartition();
if (requiredPartition == null || requiredPartition.isEmpty()) {
strBuffer.append(",\"initReSetPartCount\":0,\"initReSetPartInfo\":[]");
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
index ee4a443..b073e48 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManagerTest.java
@@ -19,7 +19,7 @@ package org.apache.inlong.tubemq.server.master.nodemanage.nodebroker;
import java.util.Arrays;
import java.util.HashSet;
-import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
+import java.util.Set;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -40,17 +40,21 @@ public class TopicPSInfoManagerTest {
@Test
public void topicSubInfo() {
- ConcurrentHashSet<String> groupSet = new ConcurrentHashSet<>();
+ Set<String> groupSet = new HashSet<>();
groupSet.add("group_001");
groupSet.add("group_002");
groupSet.add("group_003");
+ Set<String> topicSet = new HashSet<>();
+ topicSet.add("topic001");
+ for (String groupName : groupSet) {
+ topicPSInfoManager.addGroupSubTopicInfo(groupName, topicSet);
+ }
- topicPSInfoManager.setTopicSubInfo("topic001", groupSet);
- ConcurrentHashSet<String> gs1 = topicPSInfoManager.getTopicSubInfo("topic001");
+ Set<String> gs1 = topicPSInfoManager.getTopicSubInfo("topic001");
Assert.assertEquals(3, gs1.size());
- topicPSInfoManager.removeTopicSubInfo("topic001", "group_001");
- topicPSInfoManager.removeTopicSubInfo("topic001", "group_002");
+ topicPSInfoManager.rmvGroupSubTopicInfo("group_001", topicSet);
+ topicPSInfoManager.rmvGroupSubTopicInfo("group_002", topicSet);
gs1 = topicPSInfoManager.getTopicSubInfo("topic001");
Assert.assertEquals(1, gs1.size());
}
@@ -63,14 +67,14 @@ public class TopicPSInfoManagerTest {
topicList.add("topic003");
topicPSInfoManager.addProducerTopicPubInfo("producer_001", topicList);
- ConcurrentHashSet<String> ti1 = topicPSInfoManager.getTopicPubInfo("topic001");
+ Set<String> ti1 = topicPSInfoManager.getTopicPubInfo("topic001");
Assert.assertEquals(1, ti1.size());
Assert.assertTrue(ti1.contains("producer_001"));
topicPSInfoManager.rmvProducerTopicPubInfo("producer_001",
new HashSet<>(Arrays.asList("topic001", "topic002")));
- ConcurrentHashSet<String> ti2 = topicPSInfoManager.getTopicPubInfo("topic003");
+ Set<String> ti2 = topicPSInfoManager.getTopicPubInfo("topic003");
Assert.assertEquals(1, ti2.size());
Assert.assertTrue(ti2.contains("producer_001"));
}