You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/15 10:57:54 UTC
[incubator-inlong] branch master updated: [INLONG-1794]Add consumer message processing logic (#1795)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6e7386b [INLONG-1794]Add consumer message processing logic (#1795)
6e7386b is described below
commit 6e7386bdd88987affdf808a2f13aa4a91ac401d9
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 15 18:57:45 2021 +0800
[INLONG-1794]Add consumer message processing logic (#1795)
---
.../inlong/tubemq/corebase/TBaseConstants.java | 3 +
.../inlong/tubemq/corebase/utils/OpsSyncInfo.java | 117 ++++
.../inlong/tubemq/server/master/MasterConfig.java | 13 +
.../inlong/tubemq/server/master/TMaster.java | 741 +++++++++++++++++++--
.../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 58 ++
.../nodemanage/nodeconsumer/TopicConfigInfo.java | 65 --
6 files changed, 894 insertions(+), 103 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
index a15126b..ca5d31f 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
@@ -80,4 +80,7 @@ public class TBaseConstants {
public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT =
META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
+ public static final long CFG_DEF_META_FORCE_UPDATE_PERIOD = 3 * 60 * 1000;
+ public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;
+
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/OpsSyncInfo.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/OpsSyncInfo.java
new file mode 100644
index 0000000..4eb9dc6
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/OpsSyncInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.utils;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+
+public class OpsSyncInfo {
+
+ private boolean updated = false;
+ private long groupFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+ private long defFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+ private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+ private long csmFrmMaxOffsetCtrlId = TBaseConstants.META_VALUE_UNDEFINED;
+ private boolean requireAuth = false;
+ private String defFlowControlInfo = "";
+ private String groupFlowControlInfo = "";
+
+ public OpsSyncInfo() {
+
+ }
+
+ public void updOpsSyncInfo(ClientMaster.OpsTaskInfo opsTaskInfo) {
+ if (opsTaskInfo == null) {
+ return;
+ }
+ if (opsTaskInfo.hasDefFlowCheckId()) {
+ this.defFlowChkId = opsTaskInfo.getDefFlowCheckId();
+ this.updated = true;
+ }
+ if (opsTaskInfo.hasGroupFlowCheckId()) {
+ this.groupFlowChkId = opsTaskInfo.getGroupFlowCheckId();
+ this.updated = true;
+ }
+ if (opsTaskInfo.hasQryPriorityId()) {
+ this.qryPriorityId = opsTaskInfo.getQryPriorityId();
+ this.updated = true;
+ }
+ if (opsTaskInfo.hasCsmFrmMaxOffsetCtrlId()
+ && opsTaskInfo.getCsmFrmMaxOffsetCtrlId() >= 0) {
+ this.csmFrmMaxOffsetCtrlId = opsTaskInfo.getCsmFrmMaxOffsetCtrlId();
+ this.updated = true;
+ }
+ if (opsTaskInfo.hasRequireAuth() && opsTaskInfo.getRequireAuth()) {
+ this.requireAuth = true;
+ this.updated = true;
+ }
+ if (opsTaskInfo.hasDefFlowControlInfo()
+ && TStringUtils.isNotBlank(opsTaskInfo.getDefFlowControlInfo())) {
+ this.defFlowControlInfo = opsTaskInfo.getDefFlowControlInfo();
+ this.updated = true;
+ }
+ if (opsTaskInfo.hasGroupFlowControlInfo()
+ && TStringUtils.isNotBlank(opsTaskInfo.getGroupFlowControlInfo())) {
+ this.groupFlowControlInfo = opsTaskInfo.getGroupFlowControlInfo();
+ this.updated = true;
+ }
+ }
+
+ public boolean isUpdated() {
+ return updated;
+ }
+
+ public long getGroupFlowChkId() {
+ return groupFlowChkId;
+ }
+
+ public long getDefFlowChkId() {
+ return defFlowChkId;
+ }
+
+ public int getQryPriorityId() {
+ return qryPriorityId;
+ }
+
+ public long getCsmFromMaxOffsetCtrlId() {
+ return csmFrmMaxOffsetCtrlId;
+ }
+
+ public boolean isRequireAuth() {
+ return requireAuth;
+ }
+
+ public String getDefFlowControlInfo() {
+ return defFlowControlInfo;
+ }
+
+ public String getGroupFlowControlInfo() {
+ return groupFlowControlInfo;
+ }
+
+ public void clear() {
+ this.updated = false;
+ this.groupFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.defFlowChkId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.csmFrmMaxOffsetCtrlId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.requireAuth = false;
+ this.defFlowControlInfo = "";
+ this.groupFlowControlInfo = "";
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
index 1afabf7..0daa096 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
@@ -80,6 +80,7 @@ public class MasterConfig extends AbstractFileConfig {
private String visitPassword = "";
private long authValidTimeStampPeriodMs = TBaseConstants.CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL;
private int rebalanceParallel = 4;
+ private long maxMetaForceUpdatePeriodMs = TBaseConstants.CFG_DEF_META_FORCE_UPDATE_PERIOD;
/**
* getters
@@ -258,6 +259,10 @@ public class MasterConfig extends AbstractFileConfig {
return rebalanceParallel;
}
+ public long getMaxMetaForceUpdatePeriodMs() {
+ return maxMetaForceUpdatePeriodMs;
+ }
+
/**
* Load file section attributes
*
@@ -489,6 +494,13 @@ public class MasterConfig extends AbstractFileConfig {
int tmpParallel = this.getInt(masterConf, "rebalanceParallel");
this.rebalanceParallel = MixedUtils.mid(tmpParallel, 1, 20);
}
+ if (TStringUtils.isNotBlank(masterConf.get("maxMetaForceUpdatePeriodMs"))) {
+ long tmpPeriodMs = this.getLong(masterConf, "maxMetaForceUpdatePeriodMs");
+ if (tmpPeriodMs < TBaseConstants.CFG_MIN_META_FORCE_UPDATE_PERIOD) {
+ tmpPeriodMs = TBaseConstants.CFG_MIN_META_FORCE_UPDATE_PERIOD;
+ }
+ this.maxMetaForceUpdatePeriodMs = tmpPeriodMs;
+ }
}
/**
@@ -636,6 +648,7 @@ public class MasterConfig extends AbstractFileConfig {
.append("visitName", visitName)
.append("visitPassword", visitPassword)
.append("rebalanceParallel", rebalanceParallel)
+ .append("maxMetaForceUpdatePeriodMs", maxMetaForceUpdatePeriodMs)
.append(",").append(replicationConfig.toString())
.append(",").append(tlsConfig.toString())
.append(",").append(zkConfig.toString())
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 0bee4de..f323287 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -45,6 +45,7 @@ import org.apache.inlong.tubemq.corebase.cluster.NodeAddrInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.ProducerInfo;
import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
+import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
import org.apache.inlong.tubemq.corebase.config.TLSConfig;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M;
@@ -77,6 +78,7 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.Registe
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2P;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
+import org.apache.inlong.tubemq.corebase.utils.OpsSyncInfo;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
@@ -97,6 +99,7 @@ import org.apache.inlong.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.inlong.tubemq.server.common.offsetstorage.ZkOffsetStorage;
import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
+import org.apache.inlong.tubemq.server.common.utils.ClientSyncInfo;
import org.apache.inlong.tubemq.server.common.utils.HasThread;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
import org.apache.inlong.tubemq.server.common.utils.Sleeper;
@@ -106,6 +109,7 @@ import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
+import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
@@ -135,7 +139,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private final BrokerRunManager brokerRunManager; // broker run status manager
private final ConsumerEventManager consumerEventManager; //consumer event manager
private final TopicPSInfoManager topicPSInfoManager; //topic publish/subscribe info manager
- private final ExecutorService executor;
+ private final ExecutorService svrExecutor;
+ private final ExecutorService cltExecutor;
private final ProducerInfoHolder producerHolder; //producer holder
private final ConsumerInfoHolder consumerHolder; //consumer holder
private final RowLock masterRowLock; //lock
@@ -154,7 +159,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private boolean initialized = false;
private boolean startupBalance = true;
private int balanceDelayTimes = 0;
- private AtomicInteger curBalanceParal = new AtomicInteger(0);
+ private AtomicInteger curSvrBalanceParal = new AtomicInteger(0);
+ private AtomicInteger curCltBalanceParal = new AtomicInteger(0);
private Sleeper stopSleeper = new Sleeper(1000, this);
private SimpleVisitTokenManager visitTokenManager;
@@ -171,7 +177,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.checkAndCreateBdbDataPath();
this.masterAddInfo =
new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort());
- this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
+ this.svrExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
+ this.cltExecutor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig);
this.heartbeatManager = new HeartbeatManager();
@@ -488,7 +495,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
checkNodeStatus(producerId, strBuffer);
new ReleaseProducer().run(producerId);
heartbeatManager.unRegProducerNode(producerId);
- logger.info(strBuffer.append("Producer Closed")
+ logger.info(strBuffer.append("[Producer Closed] ")
.append(producerId).append(", isOverTLS=").append(overtls).toString());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -740,7 +747,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// authorize check
CertifiedResult authorizeResult =
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
- groupName, consumeGroupInfo.getTopicSet(), consumeGroupInfo.getTopicConditions(), rmtAddress);
+ groupName, consumeGroupInfo.getTopicSet(),
+ consumeGroupInfo.getTopicConditions(), rmtAddress);
if (!authorizeResult.result) {
builder.setErrCode(authorizeResult.errCode);
builder.setErrMsg(authorizeResult.errInfo);
@@ -1179,25 +1187,397 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
+ /**
+ * Client balance consumer register request with master
+ *
+ * @param request
+ * @param rmtAddress
+ * @param overtls
+ * @return register response
+ * @throws Exception
+ */
@Override
public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
String rmtAddress,
boolean overtls) throws Throwable {
- return null;
+ ProcessResult result = new ProcessResult();
+ final StringBuilder sBuffer = new StringBuilder(512);
+ RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder();
+ CertifiedResult certResult =
+ serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
+ if (!certResult.result) {
+ builder.setErrCode(certResult.errCode);
+ builder.setErrMsg(certResult.errInfo);
+ return builder.build();
+ }
+ ParamCheckResult paramCheckResult =
+ PBParameterUtils.checkClientId(request.getClientId(), sBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final String consumerId = (String) paramCheckResult.checkData;
+ paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), sBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ //final String hostName = (String) paramCheckResult.checkData;
+ paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), sBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final String groupName = (String) paramCheckResult.checkData;
+ paramCheckResult =
+ PBParameterUtils.checkConsumerTopicList(
+ request.getTopicListList(), sBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final Set<String> reqTopicSet = (Set<String>) paramCheckResult.checkData;
+ final Map<String, TreeSet<String>> reqTopicConditions =
+ DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
+ int sourceCount = request.getSourceCount();
+ int nodeId = request.getNodeId();
+ if (sourceCount > 0) {
+ if (nodeId < 0 || nodeId > (sourceCount - 1)) {
+ builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
+ builder.setErrMsg("Request nodeId value must be between in [0, sourceCount-1]!");
+ return builder.build();
+ }
+ }
+ final String clientJdkVer = request.hasJdkVersion()
+ ? request.getJdkVersion() : "";
+ final ConsumeType csmType = ConsumeType.CONSUME_CLIENT_REB;
+ OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
+ if (request.hasOpsTaskInfo()) {
+ opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
+ }
+ // check master current status
+ checkNodeStatus(consumerId, sBuffer);
+ ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
+ if (request.hasSubRepInfo()) {
+ clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
+ }
+ // build consumer object
+ ConsumerInfo inConsumerInfo =
+ new ConsumerInfo(consumerId, overtls, groupName, csmType,
+ sourceCount, nodeId, reqTopicSet, reqTopicConditions,
+ opsTaskInfo.getCsmFromMaxOffsetCtrlId(), clientSyncInfo);
+ // need removed for authorize center begin
+ if (!this.defMetaDataManager
+ .isConsumeTargetAuthorized(consumerId, groupName,
+ reqTopicSet, reqTopicConditions, sBuffer, result)) {
+ if (sBuffer.length() > 0) {
+ logger.warn(sBuffer.toString());
+ }
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
+ return builder.build();
+ }
+ // need removed for authorize center end
+ // check resource require
+ paramCheckResult =
+ PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
+ masterConfig, defMetaDataManager, brokerRunManager, sBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ CertifiedResult authorizeResult =
+ serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
+ groupName, reqTopicSet, reqTopicConditions, rmtAddress);
+ if (!authorizeResult.result) {
+ builder.setErrCode(authorizeResult.errCode);
+ builder.setErrMsg(authorizeResult.errInfo);
+ return builder.build();
+ }
+ Integer lid = null;
+ try {
+ lid = masterRowLock.getLock(null,
+ StringUtils.getBytesUtf8(consumerId), true);
+ if (!consumerHolder.addConsumer(inConsumerInfo, false, sBuffer, paramCheckResult)) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
+ heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
+ } catch (IOException e) {
+ logger.warn("Failed to lock.", e);
+ } finally {
+ if (lid != null) {
+ this.masterRowLock.releaseRowLock(lid);
+ }
+ }
+ ConsumeGroupInfo consumeGroupInfo =
+ consumerHolder.getConsumeGroupInfo(groupName);
+ if (consumeGroupInfo == null) {
+ logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId)
+ .append(" visit consume group(").append(groupName)
+ .append(" info failure, null information").toString());
+ builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
+ builder.setErrMsg(sBuffer.toString());
+ sBuffer.delete(0, sBuffer.length());
+ return builder.build();
+ }
+ inConsumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
+ if (inConsumerInfo == null) {
+ logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId)
+ .append(" visit consume info failure, null information").toString());
+ builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
+ builder.setErrMsg(sBuffer.toString());
+ sBuffer.delete(0, sBuffer.length());
+ return builder.build();
+ }
+ Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
+ currentSubInfo.put(consumerId, topicPartSubMap);
+ Tuple2<Boolean, Set<Partition>> reportInfo = clientSyncInfo.getRepSubInfo();
+ if (reportInfo.getF0()) {
+ for (Partition info : reportInfo.getF1()) {
+ Map<String, Partition> partMap =
+ topicPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
+ partMap.put(info.getPartitionKey(), info);
+ }
+ printReportInfo(consumerId, null, topicPartSubMap, sBuffer);
+ }
+ logger.info(sBuffer.append("[Consumer Register] ")
+ .append(consumerId).append(", isOverTLS=").append(overtls)
+ .append(", clientJDKVer=").append(clientJdkVer).toString());
+ sBuffer.delete(0, sBuffer.length());
+ Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+ brokerRunManager.getBrokerStaticInfo(overtls);
+ builder.setBrokerConfigId(brokerStaticInfo.getF0());
+ if (clientSyncInfo.getBrokerConfigId()
+ != brokerStaticInfo.getF0()) {
+ builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
+ }
+ builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo));
+ builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+ builder.setErrCode(TErrCodeConstants.SUCCESS);
+ builder.setErrMsg("OK!");
+ return builder.build();
}
+ /**
+ * Client balance consumer heartbeat request with master
+ *
+ * @param request
+ * @param rmtAddress
+ * @param overtls
+ * @return heartbeat response
+ * @throws Throwable
+ */
@Override
public HeartResponseM2CV2 consumerHeartbeatC2MV2(HeartRequestC2MV2 request,
String rmtAddress,
boolean overtls) throws Throwable {
- return null;
+ final StringBuilder strBuffer = new StringBuilder(512);
+ // response
+ HeartResponseM2CV2.Builder builder = HeartResponseM2CV2.newBuilder();
+ // identity valid
+ CertifiedResult certResult =
+ serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
+ if (!certResult.result) {
+ builder.setErrCode(certResult.errCode);
+ builder.setErrMsg(certResult.errInfo);
+ return builder.build();
+ }
+ ParamCheckResult paramCheckResult =
+ PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final String clientId = (String) paramCheckResult.checkData;
+ paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final String groupName = (String) paramCheckResult.checkData;
+ OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
+ if (request.hasOpsTaskInfo()) {
+ opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
+ }
+ // check master current status
+ checkNodeStatus(clientId, strBuffer);
+ ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
+ if (request.hasSubRepInfo()) {
+ clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
+ }
+ ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
+ if (consumeGroupInfo == null) {
+ builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+ builder.setErrMsg(strBuffer.append("Not found groupName ")
+ .append(groupName).append(" in holder!").toString());
+ return builder.build();
+ }
+ ConsumerInfo inConsumerInfo =
+ consumeGroupInfo.getConsumerInfo(clientId);
+ if (inConsumerInfo == null) {
+ builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+ builder.setErrMsg(strBuffer.append("Not found client ").append(clientId)
+ .append(" in group(").append(groupName).append(")").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return builder.build();
+ }
+ // authorize check
+ CertifiedResult authorizeResult =
+ serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
+ groupName, consumeGroupInfo.getTopicSet(),
+ consumeGroupInfo.getTopicConditions(), rmtAddress);
+ if (!authorizeResult.result) {
+ builder.setErrCode(authorizeResult.errCode);
+ builder.setErrMsg(authorizeResult.errInfo);
+ return builder.build();
+ }
+ // heartbeat check
+ try {
+ heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
+ } catch (HeartbeatException e) {
+ builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+ builder.setErrMsg(strBuffer
+ .append("Update consumer node exception:")
+ .append(e.getMessage()).toString());
+ return builder.build();
+ }
+ inConsumerInfo.updClientReportInfo(opsTaskInfo.getCsmFromMaxOffsetCtrlId(),
+ clientSyncInfo.getLstAssignedTime(), clientSyncInfo.getTopicMetaInfoId());
+ Tuple2<Boolean, Set<Partition>> reportInfo = clientSyncInfo.getRepSubInfo();
+ if (reportInfo.getF0()) {
+ Map<String, Map<String, Partition>> curPartSubMap =
+ currentSubInfo.get(clientId);
+ Map<String, Map<String, Partition>> newPartSubMap = new HashMap<>();
+ for (Partition info : reportInfo.getF1()) {
+ Map<String, Partition> partMap =
+ newPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
+ partMap.put(info.getPartitionKey(), info);
+ }
+ printReportInfo(clientId, curPartSubMap, newPartSubMap, strBuffer);
+ currentSubInfo.put(clientId, newPartSubMap);
+ }
+ Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+ brokerRunManager.getBrokerStaticInfo(overtls);
+ builder.setBrokerConfigId(brokerStaticInfo.getF0());
+ if (clientSyncInfo.getBrokerConfigId()
+ != brokerStaticInfo.getF0()) {
+ builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
+ }
+ Map<String, Map<String, Partition>> curPartMap = currentSubInfo.get(clientId);
+ if (inConsumerInfo.getLstAssignedTime() >= 0
+ && (curPartMap != null && !curPartMap.isEmpty())
+ && (System.currentTimeMillis() - inConsumerInfo.getLstAssignedTime()
+ > masterConfig.getMaxMetaForceUpdatePeriodMs())) {
+ Tuple2<Long, List<String>> topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+ builder.setTopicMetaInfoId(topicMetaInfoTuple.getF0());
+ if (topicMetaInfoTuple.getF0() != clientSyncInfo.getTopicMetaInfoId()) {
+ builder.addAllTopicMetaInfoList(topicMetaInfoTuple.getF1());
+ }
+ }
+ builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo));
+ builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
+ builder.setErrCode(TErrCodeConstants.SUCCESS);
+ builder.setErrMsg("OK!");
+ return builder.build();
}
+ /**
+ * Client balance consumer get partition meta information with master
+ *
+ * @param request
+ * @param rmtAddress
+ * @param overtls
+ * @return close response
+ * @throws Exception
+ */
@Override
public GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(GetPartMetaRequestC2M request,
String rmtAddress,
boolean overtls) throws Throwable {
- return null;
+ StringBuilder strBuffer = new StringBuilder(512);
+ GetPartMetaResponseM2C.Builder builder = GetPartMetaResponseM2C.newBuilder();
+ CertifiedResult certResult =
+ serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
+ if (!certResult.result) {
+ builder.setErrCode(certResult.errCode);
+ builder.setErrMsg(certResult.errInfo);
+ return builder.build();
+ }
+ ParamCheckResult paramCheckResult =
+ PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final String clientId = (String) paramCheckResult.checkData;
+ paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
+ if (!paramCheckResult.result) {
+ builder.setErrCode(paramCheckResult.errCode);
+ builder.setErrMsg(paramCheckResult.errMsg);
+ return builder.build();
+ }
+ final String groupName = (String) paramCheckResult.checkData;
+ final long brokerConfigId = request.getBrokerConfigId();
+ final long topicMetaInfoId = request.getTopicMetaInfoId();
+ checkNodeStatus(clientId, strBuffer);
+ // get control object
+ ConsumeGroupInfo consumeGroupInfo =
+ consumerHolder.getConsumeGroupInfo(groupName);
+ if (consumeGroupInfo == null) {
+ builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+ builder.setErrMsg(strBuffer.append("Not found groupName ")
+ .append(groupName).append(" in holder!").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return builder.build();
+ }
+ ConsumerInfo inConsumerInfo =
+ consumeGroupInfo.getConsumerInfo(clientId);
+ if (inConsumerInfo == null) {
+ builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+ builder.setErrMsg(strBuffer.append("Not found client ").append(clientId)
+ .append(" in group(").append(groupName).append(")").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return builder.build();
+ }
+ // heartbeat check
+ try {
+ heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
+ } catch (HeartbeatException e) {
+ builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
+ builder.setErrMsg(strBuffer
+ .append("Update consumer node exception:")
+ .append(e.getMessage()).toString());
+ return builder.build();
+ }
+ Tuple2<Long, List<String>> topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+ if (topicMetaInfoTuple.getF0() == TBaseConstants.META_VALUE_UNDEFINED) {
+ freshTopicMetaInfo(consumeGroupInfo, strBuffer);
+ topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
+ }
+ builder.setTopicMetaInfoId(topicMetaInfoTuple.getF0());
+ if (topicMetaInfoTuple.getF0() != topicMetaInfoId) {
+ builder.addAllTopicMetaInfoList(topicMetaInfoTuple.getF1());
+ }
+ Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+ brokerRunManager.getBrokerStaticInfo(overtls);
+ builder.setBrokerConfigId(brokerStaticInfo.getF0());
+ if (brokerConfigId != brokerStaticInfo.getF0()) {
+ builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
+ }
+ builder.setErrCode(TErrCodeConstants.SUCCESS);
+ builder.setErrMsg("OK!");
+ return builder.build();
}
/**
@@ -1237,7 +1617,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public void run() {
try {
if (!this.stopped) {
- Thread.sleep(masterConfig.getFirstBalanceDelayAfterStartMs());
this.balancerChore = startBalancerChore(this);
initialized = true;
while (!this.stopped) {
@@ -1258,44 +1637,60 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
*/
private void balance(final TMaster tMaster) {
final StringBuilder strBuffer = new StringBuilder(512);
- final long rebalanceId = idGenerator.incrementAndGet();
+ final long balanceId = idGenerator.incrementAndGet();
if (defMetaDataManager != null) {
- logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId)
+ logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
.append(", isMaster=").append(defMetaDataManager.isSelfMaster())
.append(", isPrimaryNodeActive=")
.append(defMetaDataManager.isPrimaryNodeActive()).toString());
} else {
- logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId)
+ logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
.append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
}
strBuffer.delete(0, strBuffer.length());
- int curDoingTasks = this.curBalanceParal.get();
+ // process client-balance
+ processClientBalanceMetaInfo(balanceId, strBuffer);
+ // process server-balance
+ processServerBalance(tMaster, balanceId, strBuffer);
+ logger.info(strBuffer.append("[Balance End] ").append(balanceId).toString());
+ }
+
+ private void processServerBalance(TMaster tMaster,
+ long balanceId,
+ StringBuilder sBuffer) {
+ int curDoingTasks = this.curSvrBalanceParal.get();
if (curDoingTasks > 0) {
- logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId)
- .append(", the previous rebalance has ")
- .append(curDoingTasks).append(" task(s) in progress!").toString());
+ logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId)
+ .append(" the Server-Balance has ").append(curDoingTasks)
+ .append(" task(s) in progress!").toString());
+ sBuffer.delete(0, sBuffer.length());
return;
}
final boolean isStartBalance = startupBalance;
List<String> groupsNeedToBalance = isStartBalance
- ? consumerHolder.getAllServerBalanceGroups() : getNeedToBalanceGroups(strBuffer);
- strBuffer.delete(0, strBuffer.length());
- if (!groupsNeedToBalance.isEmpty()) {
- // set parallel rebalance signal
- curBalanceParal.set(masterConfig.getRebalanceParallel());
+ ? consumerHolder.getAllServerBalanceGroups() : getNeedToBalanceGroups(sBuffer);
+ sBuffer.delete(0, sBuffer.length());
+ int balanceTaskCnt = groupsNeedToBalance.size();
+ if (balanceTaskCnt > 0) {
// calculate process count
- int unitNum = (groupsNeedToBalance.size() + masterConfig.getRebalanceParallel() - 1)
+ int unitNum = (balanceTaskCnt + masterConfig.getRebalanceParallel() - 1)
/ masterConfig.getRebalanceParallel();
- // start processer to do reblance;
+ // start processor to do balance;
int startIndex = 0;
int endIndex = 0;
+ // set parallel balance signal
+ curSvrBalanceParal.set(masterConfig.getRebalanceParallel());
for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
- // get groups need to rebalance
- startIndex = Math.min((i) * unitNum, groupsNeedToBalance.size());
- endIndex = Math.min((i + 1) * unitNum, groupsNeedToBalance.size());
+ // get groups need to balance
+ startIndex = Math.min((i) * unitNum, balanceTaskCnt);
+ endIndex = Math.min((i + 1) * unitNum, balanceTaskCnt);
final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex);
- // execute rebalance
- this.executor.execute(new Runnable() {
+ if (subGroups.isEmpty()) {
+ curSvrBalanceParal.decrementAndGet();
+ continue;
+ }
+ // execute balance
+ this.svrExecutor.execute(new Runnable() {
@Override
public void run() {
try {
@@ -1304,11 +1699,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
// first process reset rebalance task;
try {
- tMaster.processResetbalance(rebalanceId,
+ tMaster.processResetbalance(balanceId,
isStartBalance, subGroups);
} catch (Throwable e) {
logger.warn(new StringBuilder(1024)
- .append("[Rebalance processor] Error during reset-reb,")
+ .append("[Svr-Balance processor] Error during reset-reb,")
.append("the groups that may be affected are ")
.append(subGroups).append(",error is ")
.append(e).toString());
@@ -1316,31 +1711,148 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (tMaster.isStopped()) {
return;
}
- // second process normal rebalance task;
+ // second process normal balance task;
try {
- tMaster.processRebalance(rebalanceId,
+ tMaster.processRebalance(balanceId,
isStartBalance, subGroups);
} catch (Throwable e) {
logger.warn(new StringBuilder(1024)
- .append("[Rebalance processor] Error during normal-reb,")
+ .append("[Svr-Balance processor] Error during normal-reb,")
.append("the groups that may be affected are ")
.append(subGroups).append(",error is ")
.append(e).toString());
}
} catch (Throwable e) {
- logger.warn("[Rebalance processor] Error during process", e);
+ logger.warn("[Svr-Balance processor] Error during process", e);
} finally {
- curBalanceParal.decrementAndGet();
+ curSvrBalanceParal.decrementAndGet();
}
}
});
}
}
startupBalance = false;
- logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId).toString());
+ logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+
+ private void processClientBalanceMetaInfo(long balanceId, StringBuilder sBuffer) {
+ int curDoingTasks = this.curCltBalanceParal.get();
+ if (curDoingTasks > 0) {
+ logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId)
+ .append(" the Client-Balance has ").append(curDoingTasks)
+ .append(" task(s) in progress!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return;
+ }
+ List<String> clientGroups = consumerHolder.getAllClientBalanceGroups();
+ if (!clientGroups.isEmpty()) {
+ int balanceTaskCnt = clientGroups.size();
+ // calculate process count
+ int unitNum = (balanceTaskCnt + masterConfig.getRebalanceParallel() - 1)
+ / masterConfig.getRebalanceParallel();
+ // start processor to do balance;
+ int startIndex = 0;
+ int endIndex = 0;
+ // set parallel balance signal
+ curCltBalanceParal.set(masterConfig.getRebalanceParallel());
+ for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) {
+ // get groups need to rebalance
+ startIndex = Math.min((i) * unitNum, balanceTaskCnt);
+ endIndex = Math.min((i + 1) * unitNum, balanceTaskCnt);
+ final List<String> subGroups = clientGroups.subList(startIndex, endIndex);
+ if (subGroups.isEmpty()) {
+ curCltBalanceParal.decrementAndGet();
+ continue;
+ }
+ // execute balance
+ this.cltExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (subGroups.isEmpty()) {
+ return;
+ }
+ ConsumeGroupInfo consumeGroupInfo;
+ StringBuilder sBuffer2 = new StringBuilder(512);
+ for (String groupName : subGroups) {
+ consumeGroupInfo =
+ consumerHolder.getConsumeGroupInfo(groupName);
+ if (consumeGroupInfo == null) {
+ continue;
+ }
+ freshTopicMetaInfo(consumeGroupInfo, sBuffer2);
+ }
+ } catch (Throwable e) {
+ logger.warn("[Clt-Balance processor] Error during process", e);
+ } finally {
+ curCltBalanceParal.decrementAndGet();
+ }
+ }
+ });
+ }
+ }
+ logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId).toString());
+ sBuffer.delete(0, sBuffer.length());
}
- // process unReset group rebalance
+ public void freshTopicMetaInfo(ConsumeGroupInfo consumeGroupInfo, StringBuilder sBuffer) {
+ Map<String, String> topicMetaInfoMap =
+ getTopicConfigInfo(consumeGroupInfo, sBuffer);
+ consumeGroupInfo.updCsmTopicMetaInfo(topicMetaInfoMap);
+ }
+
+ private Map<String, String> getTopicConfigInfo(ConsumeGroupInfo groupInfo,
+ StringBuilder sBuffer) {
+ Map<String, String> result = new HashMap<>();
+ if (groupInfo.getTopicSet() == null || groupInfo.getTopicSet().isEmpty()) {
+ return result;
+ }
+ Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+ defMetaDataManager.getTopicConfMapByTopicAndBrokerIds(
+ groupInfo.getTopicSet(), null);
+ if (topicDeployInfoMap == null || topicDeployInfoMap.isEmpty()) {
+ return result;
+ }
+ int count = 0;
+ int statusId = 0;
+ TopicInfo topicInfo;
+ Set<String> fbdTopicSet =
+ defMetaDataManager.getDisableTopicByGroupName(groupInfo.getGroupName());
+ for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
+ if (entry == null) {
+ continue;
+ }
+ count = 0;
+ statusId = 0;
+ for (TopicDeployEntity deployInfo : entry.getValue()) {
+ topicInfo =
+ brokerRunManager.getPubBrokerTopicInfo(
+ deployInfo.getBrokerId(), deployInfo.getTopicName());
+ if (topicInfo != null
+ && topicInfo.isAcceptSubscribe()
+ && !fbdTopicSet.contains(topicInfo.getTopic())) {
+ statusId = 1;
+ }
+ if (count++ == 0) {
+ sBuffer.append(entry.getKey()).append(TokenConstants.SEGMENT_SEP);
+ } else {
+ sBuffer.append(TokenConstants.ARRAY_SEP);
+ }
+ sBuffer.append(deployInfo.getBrokerId()).append(TokenConstants.ATTR_SEP)
+ .append(deployInfo.getNumTopicStores()).append(TokenConstants.ATTR_SEP)
+ .append(deployInfo.getNumPartitions()).append(TokenConstants.ATTR_SEP)
+ .append(statusId);
+ }
+ if (count > 0) {
+ result.put(entry.getKey(), sBuffer.toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+ }
+ return result;
+ }
+
+ // process unReset group balance
public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
// #lizard forgives
Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
@@ -1733,6 +2245,158 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return outClientConfig;
}
+ private ClientMaster.OpsTaskInfo.Builder buildOpsTaskInfo(ConsumeGroupInfo groupInfo,
+ ConsumerInfo nodeInfo,
+ OpsSyncInfo opsTaskInfo) {
+ ClientMaster.OpsTaskInfo.Builder builder = ClientMaster.OpsTaskInfo.newBuilder();
+ long csmFromMaxCtrlId = groupInfo.getCsmFromMaxCtrlId();
+ if (csmFromMaxCtrlId != TBaseConstants.META_VALUE_UNDEFINED
+ && nodeInfo.getCsmFromMaxOffsetCtrlId() != csmFromMaxCtrlId) {
+ builder.setCsmFrmMaxOffsetCtrlId(csmFromMaxCtrlId);
+ }
+ ClusterSettingEntity defSetting =
+ defMetaDataManager.getClusterDefSetting(false);
+ GroupResCtrlEntity groupResCtrlConf =
+ defMetaDataManager.confGetGroupResCtrlConf(nodeInfo.getGroupName());
+ if (defSetting.enableFlowCtrl()) {
+ builder.setDefFlowCheckId(defSetting.getSerialId());
+ if (opsTaskInfo.getDefFlowChkId() != defSetting.getSerialId()) {
+ builder.setDefFlowControlInfo(defSetting.getGloFlowCtrlRuleInfo());
+ }
+ }
+ if (groupResCtrlConf != null
+ && groupResCtrlConf.isFlowCtrlEnable()) {
+ builder.setGroupFlowCheckId(groupResCtrlConf.getSerialId());
+ builder.setQryPriorityId(groupResCtrlConf.getQryPriorityId());
+ if (opsTaskInfo.getGroupFlowChkId() != groupResCtrlConf.getSerialId()) {
+ builder.setGroupFlowControlInfo(groupResCtrlConf.getFlowCtrlInfo());
+ }
+ }
+ return builder;
+ }
+
+ private void printReportInfo(String consumerId,
+ Map<String, Map<String, Partition>> curPartSubMa,
+ Map<String, Map<String, Partition>> newPartSubMap,
+ StringBuilder sBuffer) {
+ boolean printHeader = true;
+ if (curPartSubMa == null || curPartSubMa.isEmpty()) {
+ if (newPartSubMap != null && !newPartSubMap.isEmpty()) {
+ for (Map<String, Partition> newPartMap : newPartSubMap.values()) {
+ if (newPartMap == null || newPartMap.isEmpty()) {
+ continue;
+ }
+ for (Partition info : newPartMap.values()) {
+ printHeader = printReportItemInfo(consumerId,
+ info, true, printHeader, sBuffer);
+ }
+ }
+ }
+ } else {
+ if (newPartSubMap == null || newPartSubMap.isEmpty()) {
+ for (Map<String, Partition> curPartMap : curPartSubMa.values()) {
+ if (curPartMap == null || curPartMap.isEmpty()) {
+ continue;
+ }
+ for (Partition info : curPartMap.values()) {
+ printHeader = printReportItemInfo(consumerId,
+ info, false, printHeader, sBuffer);
+ }
+ }
+ } else {
+ for (Map.Entry<String, Map<String, Partition>> curEntry
+ : curPartSubMa.entrySet()) {
+ if (curEntry == null || curEntry.getKey() == null) {
+ continue;
+ }
+ Map<String, Partition> newPartMap =
+ newPartSubMap.get(curEntry.getKey());
+ if (newPartMap == null || newPartMap.isEmpty()) {
+ Map<String, Partition> curPartMap = curEntry.getValue();
+ if (curPartMap == null || curPartMap.isEmpty()) {
+ continue;
+ }
+ for (Partition info : curPartMap.values()) {
+ printHeader = printReportItemInfo(consumerId,
+ info, false, printHeader, sBuffer);
+ }
+ } else {
+ Map<String, Partition> curPartMap = curEntry.getValue();
+ if (curPartMap == null || curPartMap.isEmpty()) {
+ for (Partition info : newPartMap.values()) {
+ printHeader = printReportItemInfo(consumerId,
+ info, true, printHeader, sBuffer);
+ }
+ } else {
+ for (Map.Entry<String, Partition> curPartEntry
+ : curPartMap.entrySet()) {
+ if (curPartEntry == null
+ || curPartEntry.getKey() == null
+ || curPartEntry.getValue() == null) {
+ continue;
+ }
+ if (newPartMap.get(curPartEntry.getKey()) == null) {
+ printHeader = printReportItemInfo(consumerId,
+ curPartEntry.getValue(), false, printHeader, sBuffer);
+ }
+ }
+ for (Map.Entry<String, Partition> newPartEntry
+ : newPartMap.entrySet()) {
+ if (newPartEntry == null
+ || newPartEntry.getKey() == null
+ || newPartEntry.getValue() == null) {
+ continue;
+ }
+ if (curPartMap.get(newPartEntry.getKey()) == null) {
+ printHeader = printReportItemInfo(consumerId,
+ newPartEntry.getValue(), true, printHeader, sBuffer);
+ }
+ }
+ }
+ }
+ }
+ for (Map.Entry<String, Map<String, Partition>> newEntry
+ : newPartSubMap.entrySet()) {
+ if (newEntry == null || newEntry.getKey() == null) {
+ continue;
+ }
+ if (curPartSubMa.get(newEntry.getKey()) != null) {
+ continue;
+ }
+ Map<String, Partition> newPartMap = newEntry.getValue();
+ if (newPartMap == null || newPartMap.isEmpty()) {
+ continue;
+ }
+ for (Partition info : newPartMap.values()) {
+ printHeader = printReportItemInfo(consumerId,
+ info, true, printHeader, sBuffer);
+ }
+ }
+ }
+ }
+ if (sBuffer.length() > 0) {
+ logger.info(sBuffer.append("]").toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+
+ }
+
+ private boolean printReportItemInfo(String consumerId, Partition info,
+ boolean isAdd, boolean printHeader,
+ StringBuilder sBuffer) {
+ if (info == null) {
+ return printHeader;
+ }
+ String type = isAdd ? "[added]" : "[deleted]";
+ if (printHeader) {
+ sBuffer.append("[SubInfo Report] client ")
+ .append(consumerId).append(" report the info: [");
+ printHeader = false;
+ }
+ sBuffer.append(type).append(info.toString()).append(", ");
+ return printHeader;
+ }
+
/**
* build cluster configure info
*
@@ -1795,7 +2459,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
try {
webServer.stop();
rpcServiceFactory.destroy();
- executor.shutdown();
+ svrExecutor.shutdown();
+ cltExecutor.shutdown();
stopChores();
heartbeatManager.stop();
zkOffsetStorage.close();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index 0238109..a13c8b2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.slf4j.Logger;
@@ -264,6 +265,44 @@ public class ConsumeGroupInfo {
}
}
+ public void updCsmFromMaxCtrlId() {
+ csmCtrlId.set(System.currentTimeMillis());
+ }
+
+ public void updCsmTopicMetaInfo(Map<String, String> result) {
+ lastMetaInfoFreshTime.set(System.currentTimeMillis());
+ if (result == null || result.isEmpty()) {
+ return;
+ }
+ String newConfig;
+ String curCOnfig;
+ boolean isChanged = false;
+ Set<String> newTopics = result.keySet();
+ Set<String> curTopics = topicMetaInfoMap.keySet();
+ if (newTopics.size() != curTopics.size()
+ || !newTopics.containsAll(curTopics)) {
+ isChanged = true;
+ } else {
+ for (String topicKey : newTopics) {
+ newConfig = result.get(topicKey);
+ curCOnfig = topicMetaInfoMap.get(topicKey);
+ if (newConfig == null) {
+ continue;
+ }
+ if (!newConfig.equals(curCOnfig)) {
+ isChanged = true;
+ break;
+ }
+ }
+ }
+ if (isChanged) {
+ for (String newTopic : newTopics) {
+ topicMetaInfoMap.put(newTopic, result.get(newTopic));
+ }
+ topicMetaInfoId.set(System.currentTimeMillis());
+ }
+ }
+
public boolean isUnReadyServerBalance() {
return (consumeType == ConsumeType.CONSUME_BAND
&& notAllocate.get()
@@ -411,6 +450,25 @@ public class ConsumeGroupInfo {
return this.topicConditions;
}
+ public long getCsmFromMaxCtrlId() {
+ return csmCtrlId.get();
+ }
+
+ public Tuple2<Long, List<String>> getTopicMetaInfo() {
+ List<String> topicMetaInfoList = new ArrayList<>();
+ for (String metaInfo : topicMetaInfoMap.values()) {
+ if (TStringUtils.isBlank(metaInfo)) {
+ continue;
+ }
+ topicMetaInfoList.add(metaInfo);
+ }
+ return new Tuple2<>(topicMetaInfoId.get(), topicMetaInfoList);
+ }
+
+ public AtomicLong getLastMetaInfoFreshTime() {
+ return lastMetaInfoFreshTime;
+ }
+
public List<ConsumerInfo> getConsumerInfoList() {
try {
csmInfoRWLock.readLock().lock();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
deleted file mode 100644
index 766af32..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/TopicConfigInfo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.master.nodemanage.nodeconsumer;
-
-import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
-
-public class TopicConfigInfo implements Comparable<TopicConfigInfo> {
- private final int brokerId;
- private final String topicName;
- private final int numStore;
- private final int numPart;
- private boolean acceptSub;
-
- public TopicConfigInfo(TopicDeployEntity topicDeployEntity) {
- this.brokerId = topicDeployEntity.getBrokerId();
- this.topicName = topicDeployEntity.getTopicName();
- this.numStore = topicDeployEntity.getNumTopicStores();
- this.numPart = topicDeployEntity.getNumPartitions();
- this.acceptSub = false;
- }
-
- public int getBrokerId() {
- return brokerId;
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public int getNumStore() {
- return numStore;
- }
-
- public void setAcceptSub(boolean acceptSub) {
- this.acceptSub = acceptSub;
- }
-
- public boolean isAcceptSub() {
- return acceptSub;
- }
-
- public int getNumPart() {
- return numPart;
- }
-
- @Override
- public int compareTo(TopicConfigInfo o) {
- return Integer.compare(this.brokerId, o.brokerId);
- }
-}