You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/19 07:54:38 UTC
[incubator-inlong] branch TUBEMQ-570 updated: [INLONG-602] Add
replacement processing after metadata changes (#468)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new 4f68e8a [INLONG-602] Add replacement processing after metadata changes (#468)
4f68e8a is described below
commit 4f68e8a978a1e849a2acbc23ec29d25fd5b012e1
Author: gosonzhang <go...@apache.org>
AuthorDate: Wed May 19 15:54:30 2021 +0800
[INLONG-602] Add replacement processing after metadata changes (#468)
---
.../server/common/heartbeat/HeartbeatManager.java | 43 +-
.../server/common/paramcheck/PBParameterUtils.java | 16 +-
.../server/common/utils/WebParameterUtils.java | 18 +-
.../org/apache/tubemq/server/master/TMaster.java | 826 +++-----------------
.../server/master/balance/DefaultLoadBalancer.java | 62 +-
.../tubemq/server/master/balance/LoadBalancer.java | 12 +-
.../server/master/metamanage/MetaDataManager.java | 486 ++----------
.../impl/bdbimpl/BdbBrokerConfigMapperImpl.java | 4 +-
...{BrokerInfoHolder.java => BrokerAbnHolder.java} | 79 +-
.../nodemanage/nodebroker/BrokerPSInfoHolder.java | 9 +-
.../nodemanage/nodebroker/BrokerRunManager.java | 71 +-
.../nodemanage/nodebroker/BrokerRunStatusInfo.java | 62 +-
.../nodemanage/nodebroker/BrokerSyncData.java | 38 +-
.../nodebroker/BrokerSyncStatusInfo.java | 843 ---------------------
.../nodemanage/nodebroker/BrokerTopicInfoView.java | 131 ++--
.../nodemanage/nodebroker/DefBrokerRunManager.java | 504 ++++++++++++
.../nodemanage/nodebroker/TopicPSInfoManager.java | 150 ----
.../server/master/web/action/screen/Master.java | 15 +-
.../web/action/screen/config/BrokerList.java | 12 +-
.../master/web/handler/WebBrokerConfHandler.java | 56 +-
.../master/web/handler/WebMasterInfoHandler.java | 12 +-
.../master/web/handler/WebTopicDeployHandler.java | 30 +-
.../tubemq/server/common/HeartbeatManagerTest.java | 2 +-
.../nodebroker/BrokerInfoHolderTest.java | 47 --
24 files changed, 1039 insertions(+), 2489 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
index c000b31..dc83412 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
@@ -23,8 +23,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+
+import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.exception.HeartbeatException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,10 +166,12 @@ public class HeartbeatManager {
* Register a node as broker.
*
* @param nodeId the id of a node to be registered.
+ * @param createId broker run-info block id
* @return the timeout info for the registered node
*/
- public TimeoutInfo regBrokerNode(final String nodeId) {
- return this.brokerRegMap.put(nodeId, new TimeoutInfo(this.brokerTimeoutDlt));
+ public TimeoutInfo regBrokerNode(String nodeId, String createId) {
+ return this.brokerRegMap.put(nodeId,
+ new TimeoutInfo(createId, this.brokerTimeoutDlt));
}
/**
@@ -208,10 +213,18 @@ public class HeartbeatManager {
* Unregister a node from the broker
*
* @param nodeId the id of node to be unregistered
- * @return the timeout of the node
+ * @return if the timeout delete, true: success, false: failure
*/
- public TimeoutInfo unRegBrokerNode(final String nodeId) {
- return brokerRegMap.remove(nodeId);
+ public boolean unRegBrokerNode(String nodeId, String createId) {
+ TimeoutInfo timeoutInfo = brokerRegMap.get(nodeId);
+ if (timeoutInfo == null) {
+ return true;
+ }
+ if (!createId.equals(timeoutInfo.getSecondKey())) {
+ return false;
+ }
+ timeoutInfo = brokerRegMap.remove(nodeId);
+ return true;
}
/**
@@ -240,14 +253,26 @@ public class HeartbeatManager {
* @param nodeId the id of node to be updated
* @throws HeartbeatException if the timeout info of the node is not found
*/
- public void updBrokerNode(final String nodeId) throws HeartbeatException {
+ public boolean updBrokerNode(String nodeId, String createId,
+ StringBuilder sBuffer, ProcessResult result) {
TimeoutInfo timeoutInfo = brokerRegMap.get(nodeId);
if (timeoutInfo == null) {
- throw new HeartbeatException(new StringBuilder(512)
- .append("Invalid node id:").append(nodeId)
- .append(", you have to append node first!").toString());
+ result.setFailResult(TErrCodeConstants.HB_NO_NODE,
+ sBuffer.append("Invalid node id:").append(nodeId)
+ .append(", you have to append node first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if (createId.equals(timeoutInfo.getSecondKey())) {
+ result.setFailResult(TErrCodeConstants.HB_NO_NODE,
+ sBuffer.append("Invalid node block id:").append(nodeId)
+ .append(", you have to append node first!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
}
timeoutInfo.updTimeoutTime(this.brokerTimeoutDlt);
+ result.setSuccResult(null);
+ return result.isSuccess();
}
/**
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index 71b62d5..576b905 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -36,7 +36,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.MasterConfig;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,10 +181,10 @@ public class PBParameterUtils {
}
public static ParamCheckResult checkConsumerInputInfo(ConsumerInfo inConsumerInfo,
- final MasterConfig masterConfig,
- final MetaDataManager defMetaDataManager,
- final TopicPSInfoManager topicPSInfoManager,
- final StringBuilder strBuffer) throws Exception {
+ MasterConfig masterConfig,
+ MetaDataManager defMetaDataManager,
+ BrokerRunManager brokerRunManager,
+ StringBuilder strBuffer) throws Exception {
ParamCheckResult retResult = new ParamCheckResult();
if (!inConsumerInfo.isRequireBound()) {
retResult.setCheckData(inConsumerInfo);
@@ -217,7 +217,8 @@ public class PBParameterUtils {
int allowRate = (offsetResetGroupEntity != null
&& offsetResetGroupEntity.getAllowedBrokerClientRate() > 0)
? offsetResetGroupEntity.getAllowedBrokerClientRate() : masterConfig.getMaxGroupBrokerConsumeRate();
- int maxBrokerCount = topicPSInfoManager.getTopicMaxBrokerCount(inConsumerInfo.getTopicSet());
+ int maxBrokerCount =
+ brokerRunManager.getSubTopicMaxBrokerCount(inConsumerInfo.getTopicSet());
int curBClientRate = (int) Math.floor(maxBrokerCount / inConsumerInfo.getSourceCount());
if (curBClientRate > allowRate) {
int minClientCnt = (int) (maxBrokerCount / allowRate);
@@ -412,7 +413,7 @@ public class PBParameterUtils {
}
String tmpValue = brokerId.trim();
try {
- Integer.parseInt(tmpValue);
+ retResult.setCheckData(Integer.parseInt(tmpValue));
} catch (Throwable e) {
retResult.setCheckResult(false,
TErrCodeConstants.BAD_REQUEST,
@@ -420,7 +421,6 @@ public class PBParameterUtils {
strBuffer.delete(0, strBuffer.length());
return retResult;
}
- retResult.setCheckData(tmpValue);
return retResult;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 876fcca..db93e04 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -48,7 +48,8 @@ import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BaseEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+
+
public class WebParameterUtils {
@@ -1169,21 +1170,6 @@ public class WebParameterUtils {
return true;
}
- public static boolean checkBrokerInOfflining(int brokerId,
- int manageStatus,
- MetaDataManager metaManager) {
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- metaManager.getBrokerRunSyncStatusInfo(brokerId);
- if ((brokerSyncStatusInfo != null)
- && (brokerSyncStatusInfo.isBrokerRegister())) {
- if ((manageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE)
- && (brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
- return true;
- }
- }
- return false;
- }
-
/**
* check the filter conditions and get them
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index c8f99e4..c0ae34a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -46,7 +46,6 @@ import org.apache.tubemq.corebase.cluster.NodeAddrInfo;
import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.cluster.ProducerInfo;
import org.apache.tubemq.corebase.cluster.SubscribeInfo;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.config.TLSConfig;
import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M;
@@ -82,8 +81,6 @@ import org.apache.tubemq.corerpc.RpcServiceFactory;
import org.apache.tubemq.corerpc.exception.StandbyException;
import org.apache.tubemq.corerpc.service.MasterService;
import org.apache.tubemq.server.Stoppable;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.aaaserver.CertificateMasterHandler;
import org.apache.tubemq.server.common.aaaserver.CertifiedResult;
import org.apache.tubemq.server.common.aaaserver.SimpleCertificateMasterHandler;
@@ -95,7 +92,6 @@ import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.tubemq.server.common.offsetstorage.ZkOffsetStorage;
import org.apache.tubemq.server.common.paramcheck.PBParameterUtils;
import org.apache.tubemq.server.common.paramcheck.ParamCheckResult;
-import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.utils.HasThread;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.RowLock;
@@ -106,8 +102,9 @@ import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerEventManager;
@@ -129,10 +126,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
currentSubInfo = new ConcurrentHashMap<>();
private final RpcServiceFactory rpcServiceFactory = //rpc service factory
new RpcServiceFactory();
+ private final MetaDataManager defMetaDataManager; // meta data manager
+ private final BrokerRunManager brokerRunManager; // broker run status manager
private final ConsumerEventManager consumerEventManager; //consumer event manager
private final TopicPSInfoManager topicPSInfoManager; //topic publish/subscribe info manager
private final ExecutorService executor;
- private final BrokerInfoHolder brokerHolder; //broker holder
private final ProducerInfoHolder producerHolder; //producer holder
private final ConsumerInfoHolder consumerHolder; //consumer holder
private final RowLock masterRowLock; //lock
@@ -143,7 +141,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private final HeartbeatManager heartbeatManager; //heartbeat manager
private final OffsetStorage zkOffsetStorage; //zookeeper offset manager
private final ShutdownHook shutdownHook; //shutdown hook
- private final MetaDataManager defMetaDataManager;
private final CertificateMasterHandler serverAuthHandler; //server auth handler
private AtomicBoolean shutdownHooked = new AtomicBoolean(false);
private AtomicLong idGenerator = new AtomicLong(0); //id generator
@@ -172,14 +169,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel());
this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig);
this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig);
+ this.heartbeatManager = new HeartbeatManager();
+ this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
+ false, TBaseConstants.META_VALUE_UNDEFINED);
this.producerHolder = new ProducerInfoHolder();
this.consumerHolder = new ConsumerInfoHolder();
this.consumerEventManager = new ConsumerEventManager(consumerHolder);
this.topicPSInfoManager = new TopicPSInfoManager(this);
this.loadBalancer = new DefaultLoadBalancer();
- this.zkOffsetStorage = new ZkOffsetStorage(this.masterConfig.getZkConfig(),
- false, TBaseConstants.META_VALUE_UNDEFINED);
- this.heartbeatManager = new HeartbeatManager();
heartbeatManager.regConsumerCheckBusiness(masterConfig.getConsumerHeartbeatTimeoutMs(),
new TimeoutListener() {
@Override
@@ -198,22 +195,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
new ReleaseProducer().run(nodeId);
}
});
- heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
- new TimeoutListener() {
- @Override
- public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) throws Exception {
- logger.info(new StringBuilder(512).append("[Broker Timeout] ")
- .append(nodeId).toString());
- new ReleaseBroker().run(nodeId);
- }
- });
-
- this.defMetaDataManager = new MetaDataManager(masterConfig.getHostName(),
- masterConfig.getMetaDataPath(), masterConfig.getReplicationConfig());
+ this.defMetaDataManager = new MetaDataManager(this);
+ this.brokerRunManager = new DefBrokerRunManager(this);
this.defMetaDataManager.start();
- this.brokerHolder =
- new BrokerInfoHolder(this.masterConfig.getMaxAutoForbiddenCnt(),
- this.defMetaDataManager);
RpcConfig rpcTcpConfig = new RpcConfig();
rpcTcpConfig.put(RpcConstants.REQUEST_TIMEOUT,
masterConfig.getRpcReadTimeoutMs());
@@ -277,6 +261,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return defMetaDataManager;
}
+ public HeartbeatManager getHeartbeatManager() {
+ return heartbeatManager;
+ }
+
+ public BrokerRunManager getBrokerRunManager() {
+ return brokerRunManager;
+ }
+
/**
* Producer register request to master
*
@@ -342,8 +334,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
heartbeatManager.regProducerNode(producerId);
producerHolder.setProducerInfo(producerId,
new HashSet<>(transTopicSet), hostName, overtls);
- builder.setBrokerCheckSum(this.defMetaDataManager.getBrokerInfoCheckSum());
- builder.addAllBrokerInfos(this.defMetaDataManager.getBrokersMap(overtls).values());
+ Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+ brokerRunManager.getBrokerStaticInfo(overtls);
+ builder.setBrokerCheckSum(brokerStaticInfo.getF0());
+ builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
buildApprovedClientConfig(request.getAppdConfig());
@@ -433,10 +427,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
transTopicSet, hostName, overtls);
Map<String, String> availTopicPartitions = getProducerTopicPartitionInfo(producerId);
builder.addAllTopicInfos(availTopicPartitions.values());
- builder.setBrokerCheckSum(defMetaDataManager.getBrokerInfoCheckSum());
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
- if (defMetaDataManager.getBrokerInfoCheckSum() != inBrokerCheckSum) {
- builder.addAllBrokerInfos(defMetaDataManager.getBrokersMap(overtls).values());
+ Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
+ brokerRunManager.getBrokerStaticInfo(overtls);
+ builder.setBrokerCheckSum(brokerStaticInfo.getF0());
+ if (brokerStaticInfo.getF0() != inBrokerCheckSum) {
+ builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
}
ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
buildApprovedClientConfig(request.getAppdConfig());
@@ -577,7 +573,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
sessionKey, sessionTime, sourceCount, requiredPartMap);
paramCheckResult =
PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
- masterConfig, defMetaDataManager, topicPSInfoManager, strBuffer);
+ masterConfig, defMetaDataManager, brokerRunManager, strBuffer);
if (!paramCheckResult.result) {
builder.setErrCode(paramCheckResult.errCode);
builder.setErrMsg(paramCheckResult.errMsg);
@@ -939,6 +935,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
final String rmtAddress,
boolean overtls) throws Exception {
// #lizard forgives
+ ProcessResult result = new ProcessResult();
final StringBuilder strBuffer = new StringBuilder(512);
RegisterResponseM2B.Builder builder = RegisterResponseM2B.newBuilder();
builder.setSuccess(false);
@@ -946,11 +943,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setStopWrite(false);
builder.setTakeConfInfo(false);
// auth
- CertifiedResult result =
+ CertifiedResult cfResult =
serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
- if (!result.result) {
- builder.setErrCode(result.errCode);
- builder.setErrMsg(result.errInfo);
+ if (!cfResult.result) {
+ builder.setErrCode(cfResult.errCode);
+ builder.setErrMsg(cfResult.errInfo);
return builder.build();
}
// get clientId and check valid
@@ -964,88 +961,29 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
final String clientId = (String) paramCheckResult.checkData;
// check authority
checkNodeStatus(clientId, strBuffer);
- // check broker validity
- //
- BrokerInfo brokerInfo =
- new BrokerInfo(clientId, request.getEnableTls(),
- request.hasTlsPort() ? request.getTlsPort() : TBaseConstants.META_DEFAULT_BROKER_TLS_PORT);
- BrokerConfEntity brokerConfEntity =
- defMetaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
- if (brokerConfEntity == null) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("No broker configure info, please create first! the connecting client id is:")
- .append(clientId).toString());
- return builder.build();
- }
- if ((!brokerInfo.getHost().equals(brokerConfEntity.getBrokerIp()))
- || (brokerInfo.getPort() != brokerConfEntity.getBrokerPort())) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("Inconsistent broker configure,please confirm first! the connecting client id is:")
- .append(clientId).append(", the configure's broker address by brokerId is:")
- .append(brokerConfEntity.getBrokerIdAndAddress()).toString());
- return builder.build();
- }
- int confTLSPort = brokerConfEntity.getBrokerTLSPort();
- if (confTLSPort != brokerInfo.getTlsPort()) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("Inconsistent TLS configure,please confirm first! the connecting client id is:")
- .append(clientId).append(", the configured TLS port is:")
- .append(confTLSPort).append(", the broker reported TLS port is ")
- .append(brokerInfo.getTlsPort()).toString());
- return builder.build();
- }
- if (brokerConfEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("Broker's configure not online, please online configure first! " +
- "the connecting client id is:")
- .append(clientId).toString());
- return builder.build();
- }
// get optional filed
- boolean needFastStart = false;
+ ClusterSettingEntity defSetting =
+ defMetaDataManager.getClusterDefSetting(false);
final long reFlowCtrlId = request.hasFlowCheckId()
? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
final int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
- ConcurrentHashMap<Integer, BrokerSyncStatusInfo> brokerSyncStatusMap =
- this.defMetaDataManager.getBrokerRunSyncManageMap();
- // update broker status
- Map<String, String> brokerTopicSetConfInfo =
- this.defMetaDataManager.getBrokerTopicStrConfigInfo(brokerConfEntity, strBuffer);
- List<String> brokerTopicSetConfInfoList =
- new ArrayList<>(brokerTopicSetConfInfo.size());
- for (String topicItem : brokerTopicSetConfInfo.values()) {
- brokerTopicSetConfInfoList.add(topicItem);
- }
- BrokerSyncStatusInfo brokerStatusInfo =
- new BrokerSyncStatusInfo(brokerConfEntity, brokerTopicSetConfInfoList);
- brokerSyncStatusMap.put(brokerConfEntity.getBrokerId(), brokerStatusInfo);
- brokerStatusInfo.updateCurrBrokerConfInfo(brokerConfEntity.getManageStatus().getCode(),
- brokerConfEntity.isConfDataUpdated(), brokerConfEntity.isBrokerLoaded(),
- brokerConfEntity.getBrokerDefaultConfInfo(), brokerTopicSetConfInfoList, false);
- if (brokerTopicSetConfInfoList.isEmpty()) {
- needFastStart = true;
- }
- brokerStatusInfo.setFastStart(needFastStart);
- // set broker report info
- if (request.getCurBrokerConfId() <= 0) {
- brokerStatusInfo.setBrokerReportInfo(true, request.getCurBrokerConfId(),
- request.getConfCheckSumId(), true, request.getBrokerDefaultConfInfo(),
- request.getBrokerTopicSetConfInfoList(), true, request.getBrokerOnline(), overtls);
- } else {
- brokerStatusInfo.setBrokerReportInfo(true,
- brokerStatusInfo.getLastPushBrokerConfId(),
- brokerStatusInfo.getLastPushBrokerCheckSumId(), true,
- brokerConfEntity.getBrokerDefaultConfInfo(), brokerTopicSetConfInfoList, true,
- request.getBrokerOnline(), overtls);
- }
- this.defMetaDataManager.removeBrokerRunTopicInfoMap(brokerInfo.getBrokerId());
- brokerHolder.setBrokerInfo(brokerInfo.getBrokerId(), brokerInfo);
- heartbeatManager.regBrokerNode(String.valueOf(brokerInfo.getBrokerId()));
+ int tlsPort = request.hasTlsPort()
+ ? request.getTlsPort() : defSetting.getBrokerTLSPort();
+ // build broker info
+ BrokerInfo brokerInfo =
+ new BrokerInfo(clientId, request.getEnableTls(), tlsPort);
+ // register broker run status info
+ if (!brokerRunManager.brokerRegister2M(clientId, brokerInfo,
+ request.getCurBrokerConfId(), request.getConfCheckSumId(),
+ true, request.getBrokerDefaultConfInfo(),
+ request.getBrokerTopicSetConfInfoList(), request.getBrokerOnline(),
+ overtls, strBuffer, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrInfo());
+ return builder.build();
+ }
+ // print broker register log
logger.info(strBuffer.append("[Broker Register] ").append(clientId)
.append(" report, configureId=").append(request.getCurBrokerConfId())
.append(",readStatusRpt=").append(request.getReadStatusRpt())
@@ -1056,9 +994,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",qryPriorityId=").append(qryPriorityId)
.append(",checksumId=").append(request.getConfCheckSumId()).toString());
strBuffer.delete(0, strBuffer.length());
- if (request.getCurBrokerConfId() > 0) {
- processBrokerReportConfigureInfo(brokerInfo, strBuffer);
- }
// response
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1074,11 +1009,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
enableInfo.setEnableConsumeAuthenticate(masterConfig.isStartConsumeAuthenticate());
enableInfo.setEnableConsumeAuthorize(masterConfig.isStartConsumeAuthorize());
builder.setEnableBrokerInfo(enableInfo);
- builder.setTakeConfInfo(true);
- builder.setCurBrokerConfId(brokerStatusInfo.getLastPushBrokerConfId());
- builder.setConfCheckSumId(brokerStatusInfo.getLastPushBrokerCheckSumId());
- builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo());
- builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo());
+ brokerRunManager.setRegisterDownConfInfo(brokerInfo.getBrokerId(), strBuffer, builder);
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
buildClusterConfig(request.getClsConfig());
@@ -1086,8 +1017,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setClsConfig(clusterConfigBuilder);
}
if (request.hasFlowCheckId()) {
- ClusterSettingEntity defSetting =
- defMetaDataManager.getClusterDefSetting(false);
builder.setQryPriorityId(defSetting.getQryPriorityId());
builder.setFlowCheckId(defSetting.getSerialId());
if (reFlowCtrlId != defSetting.getSerialId()) {
@@ -1098,16 +1027,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
}
- logger.info(strBuffer.append("[TMaster sync] push broker configure: brokerId = ")
- .append(brokerStatusInfo.getBrokerId())
- .append(",configureId=").append(brokerStatusInfo.getLastPushBrokerConfId())
- .append(",stopWrite=").append(builder.getStopWrite())
- .append(",stopRead=").append(builder.getStopRead())
- .append(",checksumId=").append(brokerStatusInfo.getLastPushBrokerCheckSumId())
- .append(",default configure is ").append(brokerStatusInfo.getLastPushBrokerDefaultConfInfo())
- .append(",topic configure is ").append(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo())
- .toString());
- strBuffer.delete(0, strBuffer.length());
logger.info(strBuffer.append("[Broker Register] ").append(clientId)
.append(", isOverTLS=").append(overtls).toString());
return builder.build();
@@ -1156,73 +1075,23 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(paramCheckResult.errMsg);
return builder.build();
}
- final String brokerId = (String) paramCheckResult.checkData;
- checkNodeStatus(brokerId, strBuffer);
- BrokerInfo brokerInfo = brokerHolder.getBrokerInfo(Integer.parseInt(brokerId));
- if (brokerInfo == null) {
- builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer
- .append("Please register broker first! the connecting client id is:")
- .append(brokerId).toString());
- return builder.build();
- }
- BrokerConfEntity brokerConfigEntry =
- defMetaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
- if (brokerConfigEntry == null) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("No broker configure info, please create first! the connecting client id is:")
- .append(brokerInfo.toString()).toString());
- return builder.build();
- }
- if (brokerConfigEntry.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("Broker's configure not online, " +
- "please online configure first! the connecting client id is:")
- .append(brokerInfo.toString()).toString());
- return builder.build();
- }
- ConcurrentHashMap<Integer, BrokerSyncStatusInfo> brokerSyncStatusMap =
- this.defMetaDataManager.getBrokerRunSyncManageMap();
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerSyncStatusMap.get(brokerInfo.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
- builder.setErrMsg(strBuffer
- .append("Not found Broker run status info,please register first! the connecting client id is:")
- .append(brokerInfo.toString()).toString());
- return builder.build();
- }
- // update heartbeat
- try {
- heartbeatManager.updBrokerNode(brokerId);
- } catch (HeartbeatException e) {
- builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(e.getMessage());
- return builder.build();
- }
-
- // update broker status
- brokerSyncStatusInfo.setBrokerReportInfo(false, request.getCurBrokerConfId(),
- request.getConfCheckSumId(), request.getTakeConfInfo(),
- request.getBrokerDefaultConfInfo(), request.getBrokerTopicSetConfInfoList(),
- true, request.getBrokerOnline(), overtls);
- processBrokerReportConfigureInfo(brokerInfo, strBuffer);
- if (request.getTakeRemovedTopicInfo()) {
- List<String> removedTopics = request.getRemovedTopicsInfoList();
- logger.info(strBuffer.append("[Broker Report] receive broker confirmed removed topic list is ")
- .append(removedTopics.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- defMetaDataManager.clearRmvedTopicConfInfo(
- brokerConfigEntry.getBrokerId(), removedTopics, strBuffer, result);
- }
- brokerHolder.updateBrokerReportStatus(brokerInfo.getBrokerId(),
- request.getReadStatusRpt(), request.getWriteStatusRpt());
+ int brokerId = (int) paramCheckResult.checkData;
long reFlowCtrlId = request.hasFlowCheckId()
? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
+ checkNodeStatus(String.valueOf(brokerId), strBuffer);
+ if (!brokerRunManager.brokerHeartBeat2M(brokerId,
+ request.getCurBrokerConfId(), request.getConfCheckSumId(),
+ request.getTakeConfInfo(), request.getBrokerDefaultConfInfo(),
+ request.getBrokerTopicSetConfInfoList(), request.getTakeRemovedTopicInfo(),
+ request.getRemovedTopicsInfoList(), request.getReadStatusRpt(),
+ request.getWriteStatusRpt(), request.getBrokerOnline(),
+ strBuffer, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrInfo());
+ return builder.build();
+ }
if (request.getTakeConfInfo()) {
strBuffer.append("[Broker Report] heartbeat report: brokerId=")
.append(request.getBrokerId()).append(", configureId=")
@@ -1235,15 +1104,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",qryPriorityId=").append(qryPriorityId)
.append(",brokerOnline=").append(request.getBrokerOnline())
.append(",default broker configure is ").append(request.getBrokerDefaultConfInfo())
- .append(",broker topic configure is ").append(request.getBrokerTopicSetConfInfoList())
- .append(",current brokerSyncStatusInfo is ");
- logger.info(brokerSyncStatusInfo.toJsonString(strBuffer, true).toString());
+ .append(",broker topic configure is ").append(request.getBrokerTopicSetConfInfoList());
strBuffer.delete(0, strBuffer.length());
}
// create response
- builder.setNeedReportData(brokerSyncStatusInfo.needReportData());
- builder.setCurBrokerConfId(brokerSyncStatusInfo.getLastPushBrokerConfId());
- builder.setConfCheckSumId(brokerSyncStatusInfo.getLastPushBrokerCheckSumId());
+ brokerRunManager.setHeatBeatDownConfInfo(brokerId, strBuffer, builder);
+ BrokerConfEntity brokerConfEntity =
+ defMetaDataManager.getBrokerConfByBrokerId(brokerId);
+ builder.setTakeRemoveTopicInfo(true);
+ builder.addAllRemoveTopicConfInfo(defMetaDataManager
+ .getBrokerRemovedTopicStrConfigInfo(brokerConfEntity, strBuffer).values());
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
if (request.hasFlowCheckId()) {
ClusterSettingEntity defSetting =
@@ -1258,36 +1128,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
}
- brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(), builder);
ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
buildClusterConfig(request.getClsConfig());
if (clusterConfigBuilder != null) {
builder.setClsConfig(clusterConfigBuilder);
}
- builder.setTakeRemoveTopicInfo(true);
- builder.addAllRemoveTopicConfInfo(defMetaDataManager
- .getBrokerRemovedTopicStrConfigInfo(brokerConfigEntry, strBuffer).values());
- if (brokerSyncStatusInfo.needSyncConfDataToBroker()) {
- builder.setTakeConfInfo(true);
- builder.setBrokerDefaultConfInfo(brokerSyncStatusInfo
- .getLastPushBrokerDefaultConfInfo());
- builder.addAllBrokerTopicSetConfInfo(brokerSyncStatusInfo
- .getLastPushBrokerTopicSetConfInfo());
- logger.info(strBuffer
- .append("[Broker Report] heartbeat sync topic config: brokerId=")
- .append(brokerId).append(", configureId=")
- .append(brokerSyncStatusInfo.getLastPushBrokerConfId())
- .append(",set flowCtrlId=").append(builder.getFlowCheckId())
- .append(",stopWrite=").append(builder.getStopWrite())
- .append(",stopRead=").append(builder.getStopRead())
- .append(",qryPriorityId=").append(builder.getQryPriorityId())
- .append(",checksumId=").append(brokerSyncStatusInfo.getLastPushBrokerCheckSumId())
- .append(",default configure is ")
- .append(brokerSyncStatusInfo.getLastPushBrokerDefaultConfInfo())
- .append(",topic configure is ")
- .append(brokerSyncStatusInfo.getLastPushBrokerTopicSetConfInfo())
- .toString());
- }
// begin: deprecated when brokers version equal to current master version
builder.setAuthorizedInfo(genAuthorizedInfo(null, true));
// end deprecated
@@ -1311,14 +1156,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public CloseResponseM2B brokerCloseClientB2M(CloseRequestB2M request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
CloseResponseM2B.Builder builder = CloseResponseM2B.newBuilder();
builder.setSuccess(false);
- CertifiedResult result =
+ CertifiedResult cfResult =
serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo());
- if (!result.result) {
- builder.setErrCode(result.errCode);
- builder.setErrMsg(result.errInfo);
+ if (!cfResult.result) {
+ builder.setErrCode(cfResult.errCode);
+ builder.setErrMsg(cfResult.errInfo);
return builder.build();
}
ParamCheckResult paramCheckResult =
@@ -1328,12 +1174,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(paramCheckResult.errMsg);
return builder.build();
}
- final String brokerId = (String) paramCheckResult.checkData;
- checkNodeStatus(brokerId, strBuffer);
- logger.info(strBuffer.append("[Broker Closed]").append(brokerId)
- .append(", isOverTLS=").append(overtls).toString());
- new ReleaseBroker().run(brokerId);
- heartbeatManager.unRegBrokerNode(request.getBrokerId());
+ final int brokerId = (int) paramCheckResult.checkData;
+ checkNodeStatus(String.valueOf(brokerId), strBuffer);
+ if (!brokerRunManager.brokerClose2M(brokerId, strBuffer, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrInfo());
+ return builder.build();
+ }
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
builder.setErrMsg("OK!");
@@ -1359,482 +1206,18 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @return
*/
private Map<String, String> getProducerTopicPartitionInfo(String producerId) {
- Map<String, String> topicPartStrMap = new HashMap<>();
- ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId);
+ ProducerInfo producerInfo =
+ producerHolder.getProducerInfo(producerId);
if (producerInfo == null) {
- return topicPartStrMap;
+ return new HashMap<>();
}
Set<String> producerInfoTopicSet =
producerInfo.getTopicSet();
if ((producerInfoTopicSet == null)
|| (producerInfoTopicSet.isEmpty())) {
- return topicPartStrMap;
- }
- Map<String, StringBuilder> topicPartStrBuilderMap =
- new HashMap<>();
- for (String topic : producerInfoTopicSet) {
- if (topic == null) {
- continue;
- }
- ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
- topicPSInfoManager.getBrokerPubInfo(topic);
- if (topicInfoMap == null) {
- continue;
- }
- for (Map.Entry<BrokerInfo, TopicInfo> entry : topicInfoMap.entrySet()) {
- if (entry.getKey() == null || entry.getValue() == null) {
- continue;
- }
- if (entry.getValue().isAcceptPublish()) {
- StringBuilder tmpValue = topicPartStrBuilderMap.get(topic);
- if (tmpValue == null) {
- StringBuilder strBuffer =
- new StringBuilder(512).append(topic)
- .append(TokenConstants.SEGMENT_SEP)
- .append(entry.getValue().getSimpleValue());
- topicPartStrBuilderMap.put(topic, strBuffer);
- } else {
- tmpValue.append(TokenConstants.ARRAY_SEP)
- .append(entry.getValue().getSimpleValue());
- }
- }
- }
- }
- for (Map.Entry<String, StringBuilder> entry : topicPartStrBuilderMap.entrySet()) {
- if (entry.getValue() != null) {
- topicPartStrMap.put(entry.getKey(), entry.getValue().toString());
- }
- }
- topicPartStrBuilderMap.clear();
- return topicPartStrMap;
- }
-
- /**
- * Update topics
- *
- * @param brokerInfo
- * @param strBuffer
- * @param curTopicInfoMap
- * @param newTopicInfoMap
- * @param requirePartUpdate
- * @param requireAcceptPublish
- * @param requireAcceptSubscribe
- */
- private void updateTopics(BrokerInfo brokerInfo, final StringBuilder strBuffer,
- Map<String/* topicName */, TopicInfo> curTopicInfoMap,
- Map<String/* topicName */, TopicInfo> newTopicInfoMap,
- boolean requirePartUpdate, boolean requireAcceptPublish,
- boolean requireAcceptSubscribe) {
- List<TopicInfo> needAddTopicList = new ArrayList<>();
- for (Map.Entry<String, TopicInfo> entry : newTopicInfoMap.entrySet()) {
- TopicInfo newTopicInfo = entry.getValue();
- TopicInfo oldTopicInfo = null;
- if (curTopicInfoMap != null) {
- oldTopicInfo = curTopicInfoMap.get(entry.getKey());
- }
- if (oldTopicInfo == null
- || oldTopicInfo.getPartitionNum() != newTopicInfo.getPartitionNum()
- || oldTopicInfo.getTopicStoreNum() != newTopicInfo.getTopicStoreNum()
- || oldTopicInfo.isAcceptPublish() != newTopicInfo.isAcceptPublish()
- || oldTopicInfo.isAcceptSubscribe() != newTopicInfo.isAcceptSubscribe()) {
- if (requirePartUpdate) {
- if (!requireAcceptPublish) {
- newTopicInfo.setAcceptPublish(false);
- }
- if (!requireAcceptSubscribe) {
- newTopicInfo.setAcceptSubscribe(false);
- }
- }
- needAddTopicList.add(newTopicInfo);
- }
- }
- updateTopicsInternal(brokerInfo, needAddTopicList, EventType.CONNECT);
- logger.info(strBuffer.append("[addedTopicConfigMap] broker:")
- .append(brokerInfo.toString()).append(" add topicInfo list:")
- .append(needAddTopicList).toString());
- strBuffer.delete(0, strBuffer.length());
- }
-
- /**
- * Delete topics
- *
- * @param brokerInfo
- * @param strBuffer
- * @param curTopicInfoMap
- * @param newTopicInfoMap
- */
- private void deleteTopics(BrokerInfo brokerInfo, final StringBuilder strBuffer,
- Map<String/* topicName */, TopicInfo> curTopicInfoMap,
- Map<String/* topicName */, TopicInfo> newTopicInfoMap) {
- List<TopicInfo> needRmvTopicList = new ArrayList<>();
- if (curTopicInfoMap != null) {
- for (Map.Entry<String, TopicInfo> entry : curTopicInfoMap.entrySet()) {
- if (newTopicInfoMap.get(entry.getKey()) == null) {
- needRmvTopicList.add(entry.getValue());
- }
- }
- }
- updateTopicsInternal(brokerInfo, needRmvTopicList, EventType.DISCONNECT);
- logger.info(strBuffer.append("[removedTopicConfigMap] broker:")
- .append(brokerInfo.toString()).append(" removed topicInfo list:")
- .append(needRmvTopicList).toString());
- strBuffer.delete(0, strBuffer.length());
- }
-
- /**
- * Process broker report configure info
- *
- * @param brokerInfo
- * @param strBuffer
- */
- private void processBrokerReportConfigureInfo(BrokerInfo brokerInfo,
- final StringBuilder strBuffer) {
- ProcessResult result = new ProcessResult();
- // #lizard forgives
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- this.defMetaDataManager.getBrokerRunSyncStatusInfo(brokerInfo.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- logger.error(strBuffer
- .append("Fail to find broker run manage configure! broker is ")
- .append(brokerInfo.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- return;
- }
- boolean requireAcceptPublish = false;
- boolean requireAcceptSubscribe = false;
- boolean requirePartUpdate = false;
- boolean requireSyncClient = false;
- int brokerManageStatus = brokerSyncStatusInfo.getBrokerManageStatus();
- int brokerRunStatus = brokerSyncStatusInfo.getBrokerRunStatus();
- long subStepOpTimeInMills = brokerSyncStatusInfo.getSubStepOpTimeInMills();
- boolean isBrokerRegister = brokerSyncStatusInfo.isBrokerRegister();
- boolean isBrokerOnline = brokerSyncStatusInfo.isBrokerOnline();
- if (!isBrokerRegister) {
- return;
- }
- if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) {
- if (isBrokerOnline) {
- if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
- return;
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_REGISTER
- || brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE) {
- brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ);
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ) {
- if ((brokerSyncStatusInfo.isBrokerConfChanged())
- || (!brokerSyncStatusInfo.isBrokerLoaded())) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart() ? masterConfig.getStepChgWaitPeriodMs()
- : masterConfig.getOnlineOnlyReadToRWPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
- requireAcceptPublish = true;
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- }
- } else {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
- requireAcceptPublish = true;
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- }
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
- brokerSyncStatusInfo.setFastStart(true);
- requireAcceptPublish = true;
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- if ((brokerSyncStatusInfo.isBrokerConfChanged())
- || (!brokerSyncStatusInfo.isBrokerLoaded())) {
- defMetaDataManager.updateBrokerConfChanged(brokerInfo.getBrokerId(),
- false, true, strBuffer, result);
- }
- }
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_REGISTER) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_ONLINE);
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- requirePartUpdate = true;
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_ONLINE) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_PART_ONLY_READ);
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- requirePartUpdate = true;
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_PART_ONLY_READ) {
- if ((brokerSyncStatusInfo.isBrokerConfChanged())
- || (!brokerSyncStatusInfo.isBrokerLoaded())) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart()
- ? 0 : masterConfig.getOnlineOnlyReadToRWPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
- requireAcceptPublish = true;
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- requirePartUpdate = true;
- }
- } else {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_READ_AND_WRITE);
- requireAcceptPublish = true;
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- requirePartUpdate = true;
- }
- }
- } else {
- if (brokerRunStatus != TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_ONLINE);
- requireSyncClient = true;
- }
- }
- } else {
- if (!isBrokerOnline
- || brokerRunStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
- return;
- }
- if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
- if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE);
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- }
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart()
- ? masterConfig.getStepChgWaitPeriodMs()
- : masterConfig.getOfflineOnlyReadToRWPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
- brokerSyncStatusInfo.setFastStart(true);
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- }
- }
- } else if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
- if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_WRITE) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE);
- requireAcceptPublish = true;
- requireSyncClient = true;
- }
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart() ? masterConfig.getStepChgWaitPeriodMs()
- : masterConfig.getOfflineOnlyReadToRWPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
- brokerSyncStatusInfo.setFastStart(true);
- requireAcceptPublish = true;
- requireSyncClient = true;
- }
- }
- } else if (brokerManageStatus == TStatusConstants.STATUS_MANAGE_OFFLINE) {
- if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_READ_WRITE);
- requireAcceptSubscribe = true;
- requireSyncClient = true;
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_READ_WRITE) {
- long waitTime =
- brokerSyncStatusInfo.isFastStart() ? 0 : masterConfig.getStepChgWaitPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo
- .setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE);
- requireSyncClient = true;
- }
- } else if (brokerRunStatus == TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE) {
- long waitTime = brokerSyncStatusInfo.isFastStart()
- ? masterConfig.getStepChgWaitPeriodMs()
- : masterConfig.getOfflineOnlyReadToRWPeriodMs();
- if ((System.currentTimeMillis() - subStepOpTimeInMills) > waitTime) {
- brokerSyncStatusInfo.setBrokerRunStatus(TStatusConstants.STATUS_SERVICE_UNDEFINED);
- brokerSyncStatusInfo.setFastStart(true);
- requireSyncClient = true;
- }
- }
- }
- }
-
- if (requireSyncClient) {
- updateTopicInfoToClient(brokerInfo, requirePartUpdate,
- requireAcceptPublish, requireAcceptSubscribe, strBuffer);
- }
- }
-
- /**
- * Update topic info to client
- *
- * @param brokerInfo
- * @param requirePartUpdate
- * @param requireAcceptPublish
- * @param requireAcceptSubscribe
- * @param strBuffer
- */
- private void updateTopicInfoToClient(BrokerInfo brokerInfo,
- boolean requirePartUpdate,
- boolean requireAcceptPublish,
- boolean requireAcceptSubscribe,
- final StringBuilder strBuffer) {
- // #lizard forgives
- // check broker status
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- this.defMetaDataManager.getBrokerRunSyncStatusInfo(brokerInfo.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- logger.error(strBuffer
- .append("Fail to find broker run manage configure, not update topic info! broker is ")
- .append(brokerInfo.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- return;
- }
- // get broker config and then generate topic status record
- String brokerDefaultConfInfo = brokerSyncStatusInfo.getReportedBrokerDefaultConfInfo();
- int brokerManageStatusId = brokerSyncStatusInfo.getBrokerManageStatus();
- if (TStringUtils.isBlank(brokerDefaultConfInfo)) {
- return;
- }
- // get broker status and topic default config
- boolean acceptPublish = false;
- boolean acceptSubscribe = false;
- if (brokerManageStatusId >= TStatusConstants.STATUS_MANAGE_ONLINE) {
- if (brokerManageStatusId == TStatusConstants.STATUS_MANAGE_ONLINE) {
- acceptPublish = true;
- acceptSubscribe = true;
- } else if (brokerManageStatusId == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) {
- acceptSubscribe = true;
- } else if (brokerManageStatusId == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
- acceptPublish = true;
- }
- }
- List<String> brokerTopicSetConfInfo =
- brokerSyncStatusInfo.getReportedBrokerTopicSetConfInfo();
- String[] brokerDefaultConfInfoArr = brokerDefaultConfInfo.split(TokenConstants.ATTR_SEP);
- int numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
- boolean cfgAcceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
- boolean cfgAcceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
- int numTopicStores = 1;
- if (brokerDefaultConfInfoArr.length > 7) {
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[7])) {
- numTopicStores = Integer.parseInt(brokerDefaultConfInfoArr[7]);
- }
- }
- int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
- if (brokerDefaultConfInfoArr.length > 8) {
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[8])) {
- unFlushDataHold = Integer.parseInt(brokerDefaultConfInfoArr[8]);
- }
- }
- ConcurrentHashMap<String/* topic */, TopicInfo> newTopicInfoMap =
- new ConcurrentHashMap<>();
- // according to broker status and default config, topic config, make up current status record
- for (String strTopicConfInfo : brokerTopicSetConfInfo) {
- if (TStringUtils.isBlank(strTopicConfInfo)) {
- continue;
- }
- String[] topicConfInfoArr =
- strTopicConfInfo.split(TokenConstants.ATTR_SEP);
- final String tmpTopic = topicConfInfoArr[0];
- int tmpPartNum = numPartitions;
- if (!TStringUtils.isBlank(topicConfInfoArr[1])) {
- tmpPartNum = Integer.parseInt(topicConfInfoArr[1]);
- }
- boolean tmpAcceptPublish = cfgAcceptPublish;
- if (!TStringUtils.isBlank(topicConfInfoArr[2])) {
- tmpAcceptPublish = Boolean.parseBoolean(topicConfInfoArr[2]);
- }
- if (!acceptPublish) {
- tmpAcceptPublish = acceptPublish;
- } else {
- if (!requirePartUpdate) {
- if (!requireAcceptPublish) {
- tmpAcceptPublish = false;
- }
- }
- }
- int tmpNumTopicStores = numTopicStores;
- if (!TStringUtils.isBlank(topicConfInfoArr[8])) {
- tmpNumTopicStores = Integer.parseInt(topicConfInfoArr[8]);
- tmpNumTopicStores = tmpNumTopicStores > 0 ? tmpNumTopicStores : numTopicStores;
- }
- boolean tmpAcceptSubscribe = cfgAcceptSubscribe;
- if (!TStringUtils.isBlank(topicConfInfoArr[3])) {
- tmpAcceptSubscribe = Boolean.parseBoolean(topicConfInfoArr[3]);
- }
- if (!acceptSubscribe) {
- tmpAcceptSubscribe = acceptSubscribe;
- } else {
- if (!requirePartUpdate) {
- if (!requireAcceptSubscribe) {
- tmpAcceptSubscribe = false;
- }
- }
- }
- newTopicInfoMap.put(tmpTopic, new TopicInfo(brokerInfo, tmpTopic,
- tmpPartNum, tmpNumTopicStores, tmpAcceptPublish, tmpAcceptSubscribe));
- }
-
- ConcurrentHashMap<String/* topicName */, TopicInfo> oldTopicInfoMap =
- defMetaDataManager.getBrokerRunTopicInfoMap(brokerInfo.getBrokerId());
- deleteTopics(brokerInfo, strBuffer, oldTopicInfoMap, newTopicInfoMap);
- updateTopics(brokerInfo, strBuffer, oldTopicInfoMap, newTopicInfoMap,
- requirePartUpdate, requireAcceptPublish, requireAcceptSubscribe);
- defMetaDataManager.updateBrokerRunTopicInfoMap(brokerInfo.getBrokerId(), newTopicInfoMap);
- }
-
- /**
- * Update topic internal
- *
- * @param broker
- * @param topicList
- * @param type
- */
- private void updateTopicsInternal(BrokerInfo broker,
- List<TopicInfo> topicList,
- EventType type) {
- List<TopicInfo> cloneTopicList = new ArrayList<>();
- for (TopicInfo topicInfo : topicList) {
- cloneTopicList.add(topicInfo.clone());
- }
- for (TopicInfo topicInfo : cloneTopicList) {
- Integer lid = null;
- try {
- lid = this.masterRowLock.getLock(null, StringUtils.getBytesUtf8(topicInfo.getTopic()), true);
- ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
- topicPSInfoManager.getBrokerPubInfo(topicInfo.getTopic());
- if (topicInfoMap == null) {
- topicInfoMap = new ConcurrentHashMap<>();
- topicPSInfoManager.setBrokerPubInfo(topicInfo.getTopic(), topicInfoMap);
- }
- if (EventType.CONNECT == type) {
- topicInfoMap.put(broker, topicInfo);
- } else {
- topicInfoMap.remove(broker);
- }
- } catch (IOException e) {
- logger.error("Get lock error!", e);
- } finally {
- if (lid != null) {
- this.masterRowLock.releaseRowLock(lid);
- }
- }
+ return new HashMap<>();
}
+ return brokerRunManager.getPubBrokerAcceptPubPartInfo(producerInfoTopicSet);
}
@Override
@@ -1945,13 +1328,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// choose different load balance strategy
if (isFirstReb) {
finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
- topicPSInfoManager, groups, defMetaDataManager,
+ brokerRunManager, groups, defMetaDataManager,
masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
} else {
finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
- consumerHolder, brokerHolder, topicPSInfoManager, groups,
- defMetaDataManager, masterConfig.getMaxGroupBrokerConsumeRate(),
- strBuffer);
+ consumerHolder, brokerRunManager, groups, defMetaDataManager,
+ masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer);
}
// allocate partitions to consumers
for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
@@ -2079,11 +1461,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// choose different load balance strategy
if (isFirstReb) {
finalSubInfoMap = this.loadBalancer.resetBukAssign(consumerHolder,
- topicPSInfoManager, groups, this.zkOffsetStorage,
+ brokerRunManager, groups, this.zkOffsetStorage,
this.defMetaDataManager, strBuffer);
} else {
finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo,
- consumerHolder, topicPSInfoManager, groups, this.zkOffsetStorage,
+ consumerHolder, brokerRunManager, groups, this.zkOffsetStorage,
this.defMetaDataManager, strBuffer);
}
// filter
@@ -2419,8 +1801,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return topicPSInfoManager;
}
- public BrokerInfoHolder getBrokerHolder() {
- return brokerHolder;
+ public BrokerAbnHolder getBrokerAbnHolder() {
+ return brokerRunManager.getBrokerAbnHolder();
}
public ProducerInfoHolder getProducerHolder() {
@@ -2497,22 +1879,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
- private class ReleaseBroker extends AbstractReleaseRunner {
- @Override
- void run(String arg) {
- int brokerId = Integer.parseInt(arg);
- BrokerInfo broker = brokerHolder.removeBroker(brokerId);
- if (broker != null) {
- List<TopicInfo> topicInfoList =
- topicPSInfoManager.getBrokerPubInfoList(broker);
- if (topicInfoList != null) {
- updateTopicsInternal(broker, topicInfoList, EventType.DISCONNECT);
- }
- defMetaDataManager.resetBrokerReportInfo(broker.getBrokerId());
- }
- }
- }
-
private class ReleaseProducer extends AbstractReleaseRunner {
@Override
void run(String clientId) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
index dd39f7a..4f67e84 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -38,8 +38,7 @@ import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResCtrlEntity;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.NodeRebInfo;
@@ -47,6 +46,8 @@ import org.apache.tubemq.server.master.nodemanage.nodeconsumer.RebProcessInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
/* Load balance class for server side load balance, (partition size) mod (consumer size) */
public class DefaultLoadBalancer implements LoadBalancer {
private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
@@ -61,8 +62,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
*
* @param clusterState
* @param consumerHolder
- * @param brokerHolder
- * @param topicPSInfoManager
+ * @param brokerRunManager
* @param groupSet
* @param metaDataManager
* @param defAllowBClientRate
@@ -73,8 +73,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
public Map<String, Map<String, List<Partition>>> balanceCluster(
Map<String, Map<String, Map<String, Partition>>> clusterState,
ConsumerInfoHolder consumerHolder,
- BrokerInfoHolder brokerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groupSet,
MetaDataManager metaDataManager,
int defAllowBClientRate,
@@ -129,7 +128,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
int allowRate = confAllowBClientRate > 0
? confAllowBClientRate : defAllowBClientRate;
- int maxBrokerCount = topicPSInfoManager.getTopicMaxBrokerCount(topicSet);
+ int maxBrokerCount =
+ brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
int curBClientRate = (int) Math.floor(maxBrokerCount / newConsumerList.size());
if (curBClientRate > allowRate) {
int minClientCnt = maxBrokerCount / allowRate;
@@ -160,7 +160,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
}
List<ConsumerInfo> newConsumerList2 = new ArrayList<>();
- Map<String, Partition> psMap = topicPSInfoManager.getPartitionMap(topicSet);
+ Map<String, Partition> partMap =
+ brokerRunManager.getSubBrokerAcceptSubParts(topicSet);
Map<String, NodeRebInfo> rebProcessInfoMap = consumerBandInfo.getRebalanceMap();
for (ConsumerInfo consumer : newConsumerList) {
Map<String, List<Partition>> partitions = new HashMap<>();
@@ -188,7 +189,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
Map<String, Partition> partitionMap = entry.getValue();
if (partitionMap != null && !partitionMap.isEmpty()) {
for (Partition partition : partitionMap.values()) {
- Partition curPart = psMap.remove(partition.getPartitionKey());
+ Partition curPart = partMap.remove(partition.getPartitionKey());
if (curPart != null) {
ps.add(curPart);
}
@@ -198,10 +199,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
}
// random allocate
- if (psMap.size() > 0) {
+ if (partMap.size() > 0) {
onlineOfflineGroupSet.add(group);
if (!newConsumerList2.isEmpty()) {
- this.randomAssign(psMap, newConsumerList2,
+ this.randomAssign(partMap, newConsumerList2,
finalSubInfoMap, clusterState, rebProcessInfo.needProcessList);
}
}
@@ -227,7 +228,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
if (groupsNeedToBalance.size() > 0) {
finalSubInfoMap =
- balance(finalSubInfoMap, consumerHolder, topicPSInfoManager,
+ balance(finalSubInfoMap, consumerHolder, brokerRunManager,
groupsNeedToBalance, clusterState, rejGroupClientINfoMap);
}
if (!rejGroupClientINfoMap.isEmpty()) {
@@ -244,7 +245,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
private Map<String, Map<String, List<Partition>>> balance(
Map<String, Map<String, List<Partition>>> clusterState,
ConsumerInfoHolder consumerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groupSet,
Map<String, Map<String, Map<String, Partition>>> oldClusterState,
Map<String, RebProcessInfo> rejGroupClientInfoMap) {
@@ -287,7 +288,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
// sort consumer and partitions, then mod
Set<String> topics = consumerBandInfo.getTopicSet();
- Map<String, Partition> psPartMap = topicPSInfoManager.getPartitionMap(topics);
+ Map<String, Partition> psPartMap =
+ brokerRunManager.getSubBrokerAcceptSubParts(topics);
int min = psPartMap.size() / consumerList.size();
int max = psPartMap.size() % consumerList.size() == 0 ? min : min + 1;
int serverNumToLoadMax = psPartMap.size() % consumerList.size();
@@ -482,7 +484,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
* Assign consumer partitions
*
* @param consumerHolder
- * @param topicPSInfoManager
+ * @param brokerRunManager
* @param groupSet
* @param metaDataManager
* @param defAllowBClientRate
@@ -492,7 +494,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
@Override
public Map<String, Map<String, List<Partition>>> bukAssign(
ConsumerInfoHolder consumerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groupSet,
MetaDataManager metaDataManager,
int defAllowBClientRate,
@@ -529,7 +531,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
? offsetResetGroupEntity.getAllowedBrokerClientRate() : -2;
int allowRate = confAllowBClientRate > 0
? confAllowBClientRate : defAllowBClientRate;
- int maxBrokerCount = topicPSInfoManager.getTopicMaxBrokerCount(topicSet);
+ int maxBrokerCount =
+ brokerRunManager.getSubTopicMaxBrokerCount(topicSet);
int curBClientRate = (int) Math.floor(maxBrokerCount / consumerList.size());
if (curBClientRate > allowRate) {
int minClientCnt = maxBrokerCount / allowRate;
@@ -555,7 +558,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
// sort and mod
Collections.sort(consumerList);
for (String topic : topicSet) {
- List<Partition> partPubList = topicPSInfoManager.getPartitionList(topic);
+ List<Partition> partPubList =
+ brokerRunManager.getSubBrokerAcceptSubParts(topic);
Collections.sort(partPubList);
int partsPerConsumer = partPubList.size() / consumerList.size();
int consumersWithExtraPart = partPubList.size() % consumerList.size();
@@ -588,7 +592,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
* Reset
*
* @param consumerHolder
- * @param topicPSInfoManager
+ * @param brokerRunManager
* @param groupSet
* @param zkOffsetStorage
* @param metaDataManager
@@ -597,11 +601,11 @@ public class DefaultLoadBalancer implements LoadBalancer {
*/
@Override
public Map<String, Map<String, Map<String, Partition>>> resetBukAssign(
- ConsumerInfoHolder consumerHolder, TopicPSInfoManager topicPSInfoManager,
+ ConsumerInfoHolder consumerHolder, BrokerRunManager brokerRunManager,
List<String> groupSet, OffsetStorage zkOffsetStorage,
MetaDataManager metaDataManager, final StringBuilder strBuffer) {
return inReBalanceCluster(false, consumerHolder,
- topicPSInfoManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
+ brokerRunManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
}
/**
@@ -609,7 +613,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
*
* @param clusterState
* @param consumerHolder
- * @param topicPSInfoManager
+ * @param brokerRunManager
* @param groupSet
* @param zkOffsetStorage
* @param metaDataManager
@@ -619,18 +623,18 @@ public class DefaultLoadBalancer implements LoadBalancer {
@Override
public Map<String, Map<String, Map<String, Partition>>> resetBalanceCluster(
Map<String, Map<String, Map<String, Partition>>> clusterState,
- ConsumerInfoHolder consumerHolder, TopicPSInfoManager topicPSInfoManager,
+ ConsumerInfoHolder consumerHolder, BrokerRunManager brokerRunManager,
List<String> groupSet, OffsetStorage zkOffsetStorage,
MetaDataManager metaDataManager, final StringBuilder strBuffer) {
return inReBalanceCluster(true, consumerHolder,
- topicPSInfoManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
+ brokerRunManager, groupSet, zkOffsetStorage, metaDataManager, strBuffer);
}
// #lizard forgives
private Map<String, Map<String, Map<String, Partition>>> inReBalanceCluster(
boolean isResetRebalance,
ConsumerInfoHolder consumerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groupSet,
OffsetStorage zkOffsetStorage,
MetaDataManager metaDataManager,
@@ -687,12 +691,8 @@ public class DefaultLoadBalancer implements LoadBalancer {
// actual reset offset
Map<String, Long> partsOffsetMap = consumerBandInfo.getPartOffsetMap();
List<OffsetStorageInfo> offsetInfoList = new ArrayList<>();
- Set<Partition> partPubList =
- topicPSInfoManager.getPartitions(consumerBandInfo.getTopicSet());
- Map<String, Partition> partitionMap = new HashMap<>();
- for (Partition partition : partPubList) {
- partitionMap.put(partition.getPartitionKey(), partition);
- }
+ Map<String, Partition> partitionMap =
+ brokerRunManager.getSubBrokerAcceptSubParts(consumerBandInfo.getTopicSet());
for (Entry<String, String> entry : partsConsumerMap.entrySet()) {
Partition foundPart = partitionMap.get(entry.getKey());
if (foundPart != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java
index 98d1204..b17058b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/LoadBalancer.java
@@ -23,8 +23,7 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
@@ -33,8 +32,7 @@ public interface LoadBalancer {
Map<String, Map<String, List<Partition>>> balanceCluster(
Map<String, Map<String, Map<String, Partition>>> clusterState,
ConsumerInfoHolder consumerHolder,
- BrokerInfoHolder brokerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groups,
MetaDataManager metaDataManager,
int defAllowBClientRate,
@@ -43,21 +41,21 @@ public interface LoadBalancer {
Map<String, Map<String, Map<String, Partition>>> resetBalanceCluster(
Map<String, Map<String, Map<String, Partition>>> clusterState,
ConsumerInfoHolder consumerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groups,
OffsetStorage zkOffsetStorage,
MetaDataManager metaDataManager,
final StringBuilder sBuilder);
Map<String, Map<String, List<Partition>>> bukAssign(ConsumerInfoHolder consumerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groups,
MetaDataManager metaDataManager,
int defAllowBClientRate,
final StringBuilder sBuilder);
Map<String, Map<String, Map<String, Partition>>> resetBukAssign(ConsumerInfoHolder consumerHolder,
- TopicPSInfoManager topicPSInfoManager,
+ BrokerRunManager brokerRunManager,
List<String> groups,
OffsetStorage zkOffsetStorage,
MetaDataManager metaDataManager,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index a34a13a..3e1ebc8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -25,28 +25,25 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.Server;
import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
import org.apache.tubemq.server.common.statusdef.TopicStsChgType;
import org.apache.tubemq.server.common.utils.ProcessResult;
-import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.MasterConfig;
+import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus;
import org.apache.tubemq.server.master.metamanage.metastore.BdbMetaStoreServiceImpl;
import org.apache.tubemq.server.master.metamanage.metastore.MetaStoreService;
@@ -58,7 +55,8 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupResC
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunStatusInfo;
import org.apache.tubemq.server.master.web.handler.BrokerProcessResult;
import org.apache.tubemq.server.master.web.handler.GroupProcessResult;
import org.apache.tubemq.server.master.web.handler.TopicProcessResult;
@@ -72,34 +70,27 @@ public class MetaDataManager implements Server {
private static final Logger logger =
LoggerFactory.getLogger(MetaDataManager.class);
+ private final TMaster tMaster;
private static final ClusterSettingEntity defClusterSetting =
new ClusterSettingEntity().fillDefaultValue();
private final MasterReplicationConfig replicationConfig;
private final ScheduledExecutorService scheduledExecutorService;
- private final ConcurrentHashMap<Integer, String> brokersMap =
- new ConcurrentHashMap<>();
- private final ConcurrentHashMap<Integer, String> brokersTLSMap =
- new ConcurrentHashMap<>();
-
private final MasterGroupStatus masterGroupStatus = new MasterGroupStatus();
- private ConcurrentHashMap<Integer/* brokerId */, BrokerSyncStatusInfo> brokerRunSyncManageMap =
- new ConcurrentHashMap<>();
- private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, TopicInfo>>
- brokerRunTopicInfoStoreMap = new ConcurrentHashMap<>();
private volatile boolean isStarted = false;
private volatile boolean isStopped = false;
private MetaStoreService metaStoreService;
- private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
- private long lastBrokerUpdatedTime = System.currentTimeMillis();
private long serviceStartTime = System.currentTimeMillis();
- public MetaDataManager(String nodeHost, String metaDataPath,
- MasterReplicationConfig replicationConfig) {
- this.replicationConfig = replicationConfig;
+
+ public MetaDataManager(TMaster tMaster) {
+ this.tMaster = tMaster;
+ MasterConfig masterConfig = this.tMaster.getMasterConfig();
+ this.replicationConfig = masterConfig.getReplicationConfig();
this.metaStoreService =
- new BdbMetaStoreServiceImpl(nodeHost, metaDataPath, replicationConfig);
+ new BdbMetaStoreServiceImpl(masterConfig.getHostName(),
+ masterConfig.getMetaDataPath(), this.replicationConfig);
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -138,40 +129,8 @@ public class MetaDataManager implements Server {
}
}, 0, replicationConfig.getRepStatusCheckTimeoutMs(), TimeUnit.MILLISECONDS);
// initial running data
- StringBuilder sBuffer = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
- Map<Integer, BrokerConfEntity> curBrokerConfInfo =
- this.metaStoreService.getBrokerConfInfo(null);
- for (BrokerConfEntity entity : curBrokerConfInfo.values()) {
- updateBrokerMaps(entity);
- if (entity.getManageStatus().isApplied()) {
- boolean needFastStart = false;
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- this.brokerRunSyncManageMap.get(entity.getBrokerId());
- Map<String, String> brokerTopicSetConfInfo = getBrokerTopicStrConfigInfo(entity, sBuffer);
- List<String> brokerTopicSetConfInfoList =
- new ArrayList<>(brokerTopicSetConfInfo.size());
- for (String topicItem : brokerTopicSetConfInfo.values()) {
- brokerTopicSetConfInfoList.add(topicItem);
- }
- if (brokerSyncStatusInfo == null) {
- brokerSyncStatusInfo =
- new BrokerSyncStatusInfo(entity, brokerTopicSetConfInfoList);
- BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
- brokerRunSyncManageMap.putIfAbsent(entity.getBrokerId(),
- brokerSyncStatusInfo);
- if (tmpBrokerSyncStatusInfo != null) {
- brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
- }
- }
- if (brokerTopicSetConfInfo.isEmpty()) {
- needFastStart = true;
- }
- brokerSyncStatusInfo.setFastStart(needFastStart);
- brokerSyncStatusInfo.updateCurrBrokerConfInfo(entity.getManageStatus().getCode(),
- entity.isConfDataUpdated(), entity.isBrokerLoaded(),
- entity.getBrokerDefaultConfInfo(), brokerTopicSetConfInfoList, false);
- }
- }
+ BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+ brokerRunManager.updBrokerStaticInfo(this.metaStoreService.getBrokerConfInfo(null));
isStarted = true;
serviceStartTime = System.currentTimeMillis();
logger.info("BrokerConfManager StoreService Started");
@@ -213,14 +172,6 @@ public class MetaDataManager implements Server {
}
}
- public void clearBrokerRunSyncManageData() {
- if (!this.isStarted
- && this.isStopped) {
- return;
- }
- this.brokerRunSyncManageMap.clear();
- }
-
public InetSocketAddress getMasterAddress() {
return metaStoreService.getMasterAddress();
}
@@ -229,20 +180,6 @@ public class MetaDataManager implements Server {
return metaStoreService.getGroupAddressStrInfo();
}
-
-
- public long getBrokerInfoCheckSum() {
- return this.brokerInfoCheckSum.get();
- }
-
- public ConcurrentHashMap<Integer, String> getBrokersMap(boolean isOverTLS) {
- if (isOverTLS) {
- return brokersTLSMap;
- } else {
- return brokersMap;
- }
- }
-
/**
* Check if consume target is authorization or not
*
@@ -472,7 +409,9 @@ public class MetaDataManager implements Server {
if (isAddOp) {
if (metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId()) == null &&
metaStoreService.getBrokerConfByBrokerIp(entity.getBrokerIp()) == null) {
- metaStoreService.addBrokerConf(entity, sBuffer, result);
+ if (metaStoreService.addBrokerConf(entity, sBuffer, result)) {
+ this.tMaster.getBrokerRunManager().updBrokerStaticInfo(entity);
+ }
} else {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
sBuffer.append("Duplicated broker configure record! query index is :")
@@ -495,15 +434,8 @@ public class MetaDataManager implements Server {
entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
entity.getRegionId(), entity.getGroupId(),
entity.getManageStatus(), entity.getTopicProps())) {
- metaStoreService.updBrokerConf(newEntity, sBuffer, result);
- // update broker configure change status
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- getBrokerRunSyncStatusInfo(entity.getBrokerId());
- if (result.isSuccess()) {
- if (brokerSyncStatusInfo != null) {
- updateBrokerConfChanged(entity.getBrokerId(),
- true, true, sBuffer, result);
- }
+ if (metaStoreService.updBrokerConf(newEntity, sBuffer, result)) {
+ triggerBrokerConfDataSync(entity.getBrokerId(), sBuffer, result);
}
} else {
result.setSuccResult(null);
@@ -528,59 +460,6 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
-
- /**
- * Delete broker configure information
- *
- * @param operator operator
- * @param brokerId need deleted broker id
- * @param strBuffer the print information string buffer
- * @param result the process result return
- * @return true if success otherwise false
- */
- public boolean confDelBrokerConfig(String operator,
- int brokerId,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (!metaStoreService.checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- // valid configure status
- if (metaStoreService.hasConfiguredTopics(brokerId)) {
- result.setFailResult(DataOpErrCode.DERR_UNCLEANED.getCode(),
- "The broker's topic configure uncleaned!");
- return result.isSuccess();
- }
- BrokerConfEntity curEntity =
- metaStoreService.getBrokerConfByBrokerId(brokerId);
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- "The broker configure not exist!");
- return result.isSuccess();
- }
- if (curEntity.getManageStatus().isOnlineStatus()) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- "Broker manage status is online, please offline first!");
- return result.isSuccess();
- }
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- this.brokerRunSyncManageMap.get(curEntity.getBrokerId());
- if (brokerSyncStatusInfo != null) {
- if (brokerSyncStatusInfo.isBrokerRegister()
- && (curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
- && brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- "Broker is processing offline event, please wait and try later!");
- return result.isSuccess();
- }
- }
- if (metaStoreService.delBrokerConf(operator, brokerId, strBuffer, result)) {
- this.brokerRunSyncManageMap.remove(brokerId);
- delBrokerRunData(brokerId);
- }
- return result.isSuccess();
- }
-
/**
* Get broker configure information
*
@@ -654,10 +533,8 @@ public class MetaDataManager implements Server {
TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_VALUE_UNDEFINED, newMngStatus, null)) {
- metaStoreService.updBrokerConf(newEntry, sBuffer, result);
- if (result.isSuccess()) {
- triggerBrokerConfDataSync(newEntry,
- curEntry.getManageStatus().getCode(), true, sBuffer, result);
+ if (metaStoreService.updBrokerConf(newEntry, sBuffer, result)) {
+ triggerBrokerConfDataSync(newEntry.getBrokerId(), sBuffer, result);
}
} else {
result.setSuccResult(null);
@@ -700,8 +577,7 @@ public class MetaDataManager implements Server {
retInfo.add(new BrokerProcessResult(brokerId, "", result));
continue;
}
- triggerBrokerConfDataSync(curEntry,
- curEntry.getManageStatus().getCode(), true, sBuffer, result);
+ triggerBrokerConfDataSync(curEntry.getBrokerId(), sBuffer, result);
retInfo.add(new BrokerProcessResult(brokerId, curEntry.getBrokerIp(), result));
}
return retInfo;
@@ -722,55 +598,61 @@ public class MetaDataManager implements Server {
StringBuilder sBuffer,
ProcessResult result) {
List<BrokerProcessResult> retInfo = new ArrayList<>();
- Map<Integer, BrokerConfEntity> cfmBrokerMap = new HashMap<>();
- Map<Integer, BrokerConfEntity> tgtBrokerConfMap =
- getBrokerConfInfo(brokerIdSet, null, null);
- // check target broker configure's status
- for (BrokerConfEntity entity : tgtBrokerConfMap.values()) {
- if (entity == null) {
+ for (int brokerId : brokerIdSet) {
+ // check broker status
+ if (!isAllowDeleteBrokerConf(brokerId, rsvData, sBuffer, result)) {
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
continue;
}
- if (!isMatchDeleteConds(entity.getBrokerId(),
- entity.getManageStatus(), rsvData, sBuffer, result)) {
- retInfo.add(new BrokerProcessResult(
- entity.getBrokerId(), entity.getBrokerIp(), result));
- }
- cfmBrokerMap.put(entity.getBrokerId(), entity);
- }
- if (cfmBrokerMap.isEmpty()) {
- return retInfo;
- }
- // execute delete operation
- for (BrokerConfEntity entry : cfmBrokerMap.values()) {
- if (entry == null) {
+ BrokerConfEntity entity =
+ metaStoreService.getBrokerConfByBrokerId(brokerId);
+ if (entity == null) {
+ result.setSuccResult(null);
+ retInfo.add(new BrokerProcessResult(brokerId, "", result));
continue;
}
- delBrokerConfig(operator, entry.getBrokerId(), rsvData, sBuffer, result);
- retInfo.add(new BrokerProcessResult(
- entry.getBrokerId(), entry.getBrokerIp(), result));
+ delBrokerConfig(operator, entity.getBrokerId(), rsvData, sBuffer, result);
+ retInfo.add(new BrokerProcessResult(entity.getBrokerId(),
+ entity.getBrokerIp(), result));
}
return retInfo;
}
- private boolean isMatchDeleteConds(int brokerId, ManageStatus brokerStatus,
- boolean rsvData, StringBuilder sBuffer,
- ProcessResult result) {
- Map<String, TopicDeployEntity> topicConfigMap =
- getBrokerTopicConfEntitySet(brokerId);
- if (topicConfigMap == null || topicConfigMap.isEmpty()) {
+ private boolean isAllowDeleteBrokerConf(int brokerId, boolean rsvData,
+ StringBuilder sBuffer, ProcessResult result) {
+ BrokerConfEntity entity =
+ metaStoreService.getBrokerConfByBrokerId(brokerId);
+ if (entity == null) {
result.setSuccResult(null);
return result.isSuccess();
}
- if (WebParameterUtils.checkBrokerInOfflining(brokerId,
- brokerStatus.getCode(), this)) {
+ // check broker's manage status
+ if (entity.getManageStatus().isOnlineStatus()) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
+ "Broker manage status is online, please offline first!");
+ return result.isSuccess();
+ }
+ BrokerRunManager brokerRunManager = tMaster.getBrokerRunManager();
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunManager.getBrokerRunStatusInfo(brokerId);
+ if (runStatusInfo != null
+ && entity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
+ && runStatusInfo.inProcessingStatus()) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
sBuffer.append("Illegal value: the broker is processing offline event by brokerId=")
- .append(brokerId).append(", please wait and try later!").toString());
+ .append(brokerId).append(", please offline first and try later!").toString());
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
+ // check broker's topic configures
+ Map<String, TopicDeployEntity> topiConfMap =
+ metaStoreService.getConfiguredTopicInfo(brokerId);
+ if (topiConfMap == null || topiConfMap.isEmpty()) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
if (rsvData) {
- for (Map.Entry<String, TopicDeployEntity> entry : topicConfigMap.entrySet()) {
+ for (Map.Entry<String, TopicDeployEntity> entry : topiConfMap.entrySet()) {
if (entry.getValue() == null) {
continue;
}
@@ -779,8 +661,7 @@ public class MetaDataManager implements Server {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
sBuffer.append("The topic ").append(entry.getKey())
.append("'s acceptPublish and acceptSubscribe parameters")
- .append(" must be false in broker=")
- .append(brokerId)
+ .append(" must be false in broker=").append(brokerId)
.append(" before broker delete by reserve data method!").toString());
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
@@ -788,8 +669,7 @@ public class MetaDataManager implements Server {
}
} else {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- sBuffer.append("Topic configure of broker by brokerId=")
- .append(brokerId)
+ sBuffer.append("Topic configure of broker by brokerId=").append(brokerId)
.append(" not deleted, please delete broker's topic configure first!").toString());
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
@@ -798,7 +678,6 @@ public class MetaDataManager implements Server {
return result.isSuccess();
}
-
private boolean delBrokerConfig(String operator, int brokerId, boolean rsvData,
StringBuilder strBuffer, ProcessResult result) {
BrokerConfEntity curEntity =
@@ -828,20 +707,19 @@ public class MetaDataManager implements Server {
"Broker manage status is online, please offline first!");
return result.isSuccess();
}
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- this.brokerRunSyncManageMap.get(curEntity.getBrokerId());
- if (brokerSyncStatusInfo != null) {
- if (brokerSyncStatusInfo.isBrokerRegister()
- && (curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
- && brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED)) {
+ BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+ BrokerRunStatusInfo runStatusInfo = brokerRunManager.getBrokerRunStatusInfo(brokerId);
+ if (runStatusInfo != null) {
+ if ((curEntity.getManageStatus() == ManageStatus.STATUS_MANAGE_OFFLINE
+ && runStatusInfo.inProcessingStatus())) {
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
"Broker is processing offline event, please wait and try later!");
return result.isSuccess();
}
}
if (metaStoreService.delBrokerConf(operator, brokerId, strBuffer, result)) {
- this.brokerRunSyncManageMap.remove(brokerId);
- delBrokerRunData(brokerId);
+ brokerRunManager.delBrokerStaticInfo(brokerId);
+ brokerRunManager.releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId());
}
return result.isSuccess();
}
@@ -849,74 +727,28 @@ public class MetaDataManager implements Server {
/**
* Manual reload broker config info
*
- * @param entity
- * @param oldManageStatus
- * @param needFastStart
+ * @param brokerId
+ * @param strBuffer
+ * @param result
* @return true if success otherwise false
* @throws Exception
*/
- public boolean triggerBrokerConfDataSync(BrokerConfEntity entity,
- int oldManageStatus,
- boolean needFastStart,
+ public boolean triggerBrokerConfDataSync(int brokerId,
StringBuilder strBuffer,
ProcessResult result) {
if (!metaStoreService.checkStoreStatus(true, result)) {
return result.isSuccess();
}
- BrokerConfEntity curEntity =
- metaStoreService.getBrokerConfByBrokerId(entity.getBrokerId());
- if (curEntity == null) {
- result.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(),
- "The broker configure not exist!");
- return result.isSuccess();
- }
- String curBrokerConfStr = curEntity.getBrokerDefaultConfInfo();
- Map<String, String> curBrokerTopicConfStrSet =
- getBrokerTopicStrConfigInfo(curEntity, strBuffer);
- List<String> brokerTopicSetConfInfoList =
- new ArrayList<>(curBrokerTopicConfStrSet.size());
- for (String topicItem : curBrokerTopicConfStrSet.values()) {
- brokerTopicSetConfInfoList.add(topicItem);
- }
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- this.brokerRunSyncManageMap.get(entity.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- brokerSyncStatusInfo =
- new BrokerSyncStatusInfo(entity, brokerTopicSetConfInfoList);
- BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
- brokerRunSyncManageMap.putIfAbsent(entity.getBrokerId(), brokerSyncStatusInfo);
- if (tmpBrokerSyncStatusInfo != null) {
- brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
- }
- }
- if (brokerSyncStatusInfo.isBrokerRegister()
- && brokerSyncStatusInfo.getBrokerRunStatus() != TStatusConstants.STATUS_SERVICE_UNDEFINED) {
- result.setFailResult(DataOpErrCode.DERR_ILLEGAL_STATUS.getCode(),
- strBuffer.append("The broker is processing online event(")
- .append(brokerSyncStatusInfo.getBrokerRunStatus())
- .append("), please try later! ").toString());
- strBuffer.delete(0, strBuffer.length());
+ BrokerRunManager brokerRunManager = this.tMaster.getBrokerRunManager();
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunManager.getBrokerRunStatusInfo(brokerId);
+ if (runStatusInfo == null) {
+ result.setSuccResult(null);
return result.isSuccess();
}
- if (brokerSyncStatusInfo.isFastStart()) {
- brokerSyncStatusInfo.setFastStart(needFastStart);
- }
- int curManageStatus = curEntity.getManageStatus().getCode();
- if (curManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE
- || curManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE
- || curManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) {
- boolean isOnlineUpdate =
- (oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE
- || oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE
- || oldManageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ);
- brokerSyncStatusInfo.updateCurrBrokerConfInfo(curManageStatus,
- curEntity.isConfDataUpdated(), curEntity.isBrokerLoaded(), curBrokerConfStr,
- brokerTopicSetConfInfoList, isOnlineUpdate);
- } else {
- brokerSyncStatusInfo.setBrokerOffline();
- }
- strBuffer.append("triggered broker syncStatus info is ");
- logger.info(brokerSyncStatusInfo.toJsonString(strBuffer, false).toString());
+ runStatusInfo.notifyDataChanged();
+ strBuffer.append("[Meta data] triggered broker syncStatus info is ");
+ logger.info(runStatusInfo.toJsonString(strBuffer).toString());
strBuffer.delete(0, strBuffer.length());
result.setSuccResult(null);
return result.isSuccess();
@@ -1021,14 +853,6 @@ public class MetaDataManager implements Server {
}
- public ConcurrentHashMap<Integer, BrokerSyncStatusInfo> getBrokerRunSyncManageMap() {
- return this.brokerRunSyncManageMap;
- }
-
- public BrokerSyncStatusInfo getBrokerRunSyncStatusInfo(int brokerId) {
- return this.brokerRunSyncManageMap.get(brokerId);
- }
-
public BrokerConfEntity getBrokerConfByBrokerId(int brokerId) {
return metaStoreService.getBrokerConfByBrokerId(brokerId);
}
@@ -1041,154 +865,6 @@ public class MetaDataManager implements Server {
return metaStoreService.getConfiguredTopicInfo(brokerId);
}
- public ConcurrentHashMap<String/* topicName */, TopicInfo> getBrokerRunTopicInfoMap(
- final int brokerId) {
- return this.brokerRunTopicInfoStoreMap.get(brokerId);
- }
-
- public void removeBrokerRunTopicInfoMap(final int brokerId) {
- this.brokerRunTopicInfoStoreMap.remove(brokerId);
- }
-
- public void updateBrokerRunTopicInfoMap(final int brokerId,
- ConcurrentHashMap<String, TopicInfo> topicInfoMap) {
- this.brokerRunTopicInfoStoreMap.put(brokerId, topicInfoMap);
- }
-
- public void resetBrokerReportInfo(final int brokerId) {
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerRunSyncManageMap.get(brokerId);
- if (brokerSyncStatusInfo != null) {
- brokerSyncStatusInfo.resetBrokerReportInfo();
- }
- brokerRunTopicInfoStoreMap.remove(brokerId);
- }
-
- /**
- * Update broker config
- *
- * @param brokerId
- * @param isChanged
- * @param isFasterStart
- * @return true if success otherwise false
- */
- public boolean updateBrokerConfChanged(int brokerId,
- boolean isChanged,
- boolean isFasterStart,
- StringBuilder strBuffer,
- ProcessResult result) {
- if (!metaStoreService.checkStoreStatus(true, result)) {
- return result.isSuccess();
- }
- BrokerConfEntity curEntity =
- metaStoreService.getBrokerConfByBrokerId(brokerId);
- if (curEntity == null) {
- return false;
- }
- // This function needs to be optimized continue
- if (isChanged) {
- if (!curEntity.isConfDataUpdated()) {
- curEntity.setConfDataUpdated();
- modBrokerConfig(curEntity, strBuffer, result);
- }
- if (curEntity.getManageStatus().isApplied()) {
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerRunSyncManageMap.get(curEntity.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- Map<String, String> newBrokerTopicConfStrSet =
- getBrokerTopicStrConfigInfo(curEntity, strBuffer);
- List<String> brokerTopicSetConfInfoList =
- new ArrayList<>(newBrokerTopicConfStrSet.size());
- for (String topicItem : newBrokerTopicConfStrSet.values()) {
- brokerTopicSetConfInfoList.add(topicItem);
- }
- brokerSyncStatusInfo =
- new BrokerSyncStatusInfo(curEntity, brokerTopicSetConfInfoList);
- BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
- brokerRunSyncManageMap.putIfAbsent(curEntity.getBrokerId(), brokerSyncStatusInfo);
- if (tmpBrokerSyncStatusInfo != null) {
- brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
- }
- }
- if (brokerSyncStatusInfo.isFastStart()) {
- brokerSyncStatusInfo.setFastStart(isFasterStart);
- }
- if (!brokerSyncStatusInfo.isBrokerConfChanged()) {
- brokerSyncStatusInfo.setBrokerConfChanged();
- }
- }
- } else {
- if (curEntity.isConfDataUpdated()) {
- curEntity.setBrokerLoaded();
- modBrokerConfig(curEntity, strBuffer, result);
- }
- if (curEntity.getManageStatus().isApplied()) {
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- brokerRunSyncManageMap.get(curEntity.getBrokerId());
- if (brokerSyncStatusInfo == null) {
- Map<String, String> newBrokerTopicConfStrSet =
- getBrokerTopicStrConfigInfo(curEntity, strBuffer);
- List<String> brokerTopicSetConfInfoList =
- new ArrayList<>(newBrokerTopicConfStrSet.size());
- for (String topicItem : newBrokerTopicConfStrSet.values()) {
- brokerTopicSetConfInfoList.add(topicItem);
- }
- brokerSyncStatusInfo =
- new BrokerSyncStatusInfo(curEntity, brokerTopicSetConfInfoList);
- BrokerSyncStatusInfo tmpBrokerSyncStatusInfo =
- brokerRunSyncManageMap.putIfAbsent(curEntity.getBrokerId(),
- brokerSyncStatusInfo);
- if (tmpBrokerSyncStatusInfo != null) {
- brokerSyncStatusInfo = tmpBrokerSyncStatusInfo;
- }
- }
- if (brokerSyncStatusInfo.isBrokerConfChanged()) {
- brokerSyncStatusInfo.setBrokerLoaded();
- brokerSyncStatusInfo.setFastStart(isFasterStart);
- }
- }
- }
- return true;
- }
-
- public void updateBrokerMaps(BrokerConfEntity entity) {
- if (entity != null) {
- String brokerReg =
- this.brokersMap.putIfAbsent(entity.getBrokerId(),
- entity.getSimpleBrokerInfo());
- String brokerTLSReg =
- this.brokersTLSMap.putIfAbsent(entity.getBrokerId(),
- entity.getSimpleTLSBrokerInfo());
- if (brokerReg == null
- || brokerTLSReg == null
- || !brokerReg.equals(entity.getSimpleBrokerInfo())
- || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
- if (brokerReg != null
- && !brokerReg.equals(entity.getSimpleBrokerInfo())) {
- this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
- }
- if (brokerTLSReg != null
- && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
- this.brokersTLSMap.put(entity.getBrokerId(), entity.getSimpleTLSBrokerInfo());
- }
- this.lastBrokerUpdatedTime = System.currentTimeMillis();
- this.brokerInfoCheckSum.set(this.lastBrokerUpdatedTime);
- }
- }
- }
-
- public void delBrokerRunData(int brokerId) {
- if (brokerId == TBaseConstants.META_VALUE_UNDEFINED) {
- return;
- }
- String brokerReg = this.brokersMap.remove(brokerId);
- String brokerTLSReg = this.brokersTLSMap.remove(brokerId);
- if (brokerReg != null || brokerTLSReg != null) {
- this.lastBrokerUpdatedTime = System.currentTimeMillis();
- this.brokerInfoCheckSum.set(this.lastBrokerUpdatedTime);
- }
- }
-
// ////////////////////////////////////////////////////////////////////////////
public TopicProcessResult addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
index 60b5ac5..ad6afa0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.java
@@ -227,14 +227,14 @@ public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
// get broker configures
if (qryBrokerKey == null) {
for (BrokerConfEntity entity : brokerConfCache.values()) {
- if (entity != null && entity.isMatched(qryEntity)) {
+ if (entity != null && qryEntity != null && entity.isMatched(qryEntity)) {
retMap.put(entity.getBrokerId(), entity);
}
}
} else {
for (Integer brokerId : qryBrokerKey) {
BrokerConfEntity entity = brokerConfCache.get(brokerId);
- if (entity != null && entity.isMatched(qryEntity)) {
+ if (entity != null && qryEntity != null && entity.isMatched(qryEntity)) {
retMap.put(entity.getBrokerId(), entity);
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
similarity index 83%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 0040d5b..51f886d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -18,9 +18,7 @@
package org.apache.tubemq.server.master.nodemanage.nodebroker;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -29,8 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
@@ -40,37 +37,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BrokerInfoHolder {
+public class BrokerAbnHolder {
private static final Logger logger =
- LoggerFactory.getLogger(BrokerInfoHolder.class);
- private final ConcurrentHashMap<Integer/* brokerId */, BrokerInfo> brokerInfoMap =
- new ConcurrentHashMap<>();
+ LoggerFactory.getLogger(BrokerAbnHolder.class);
private final ConcurrentHashMap<Integer/* brokerId */, BrokerAbnInfo> brokerAbnormalMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer/* brokerId */, BrokerFbdInfo> brokerForbiddenMap =
new ConcurrentHashMap<>();
private final int maxAutoForbiddenCnt;
private final MetaDataManager metaDataManager;
- private AtomicInteger brokerTotalCount = new AtomicInteger(0);
private AtomicInteger brokerForbiddenCount = new AtomicInteger(0);
- public BrokerInfoHolder(final int maxAutoForbiddenCnt,
- final MetaDataManager metaDataManager) {
+ public BrokerAbnHolder(final int maxAutoForbiddenCnt,
+ final MetaDataManager metaDataManager) {
this.maxAutoForbiddenCnt = maxAutoForbiddenCnt;
this.metaDataManager = metaDataManager;
}
- public BrokerInfo getBrokerInfo(int brokerId) {
- return brokerInfoMap.get(brokerId);
- }
-
- public void setBrokerInfo(int brokerId, BrokerInfo brokerInfo) {
- if (brokerInfoMap.put(brokerId, brokerInfo) == null) {
- this.brokerTotalCount.incrementAndGet();
- }
- }
-
public void updateBrokerReportStatus(int brokerId,
int reportReadStatus,
int reportWriteStatus) {
@@ -156,44 +140,15 @@ public class BrokerInfoHolder {
}
}
- public void setBrokerHeartBeatReqStatus(int brokerId,
- ClientMaster.HeartResponseM2B.Builder builder) {
+ public Tuple2<Boolean, Boolean> getBrokerAutoFbdStatus(int brokerId) {
+ Tuple2<Boolean, Boolean> retTuple = new Tuple2<>(false, false);
BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.get(brokerId);
if (brokerFbdInfo == null) {
- builder.setStopWrite(false);
- builder.setStopRead(false);
- } else {
- switch (brokerFbdInfo.newStatus) {
- case STATUS_MANAGE_ONLINE_NOT_READ: {
- builder.setStopWrite(false);
- builder.setStopRead(true);
- }
- break;
- case STATUS_MANAGE_ONLINE_NOT_WRITE: {
- builder.setStopWrite(true);
- builder.setStopRead(false);
- }
- break;
- case STATUS_MANAGE_OFFLINE: {
- builder.setStopWrite(true);
- builder.setStopRead(true);
- }
- break;
- case STATUS_MANAGE_ONLINE:
- default: {
- builder.setStopWrite(false);
- builder.setStopRead(false);
- }
- }
+ return retTuple;
}
- }
-
- public Map<Integer, BrokerInfo> getBrokerInfos(Collection<Integer> brokerIds) {
- HashMap<Integer, BrokerInfo> brokerMap = new HashMap<>();
- for (Integer brokerId : brokerIds) {
- brokerMap.put(brokerId, brokerInfoMap.get(brokerId));
- }
- return brokerMap;
+ retTuple.setF0AndF1(brokerFbdInfo.newStatus.isAcceptPublish(),
+ brokerFbdInfo.newStatus.isAcceptSubscribe());
+ return retTuple;
}
/**
@@ -202,25 +157,15 @@ public class BrokerInfoHolder {
* @param brokerId
* @return the deleted broker info
*/
- public BrokerInfo removeBroker(Integer brokerId) {
- BrokerInfo brokerInfo = brokerInfoMap.remove(brokerId);
+ public void removeBroker(Integer brokerId) {
brokerAbnormalMap.remove(brokerId);
BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.remove(brokerId);
- if (brokerInfo != null) {
- this.brokerTotalCount.decrementAndGet();
- }
if (brokerFbdInfo != null) {
this.brokerForbiddenCount.decrementAndGet();
}
- return brokerInfo;
- }
-
- public Map<Integer, BrokerInfo> getBrokerInfoMap() {
- return brokerInfoMap;
}
public void clear() {
- brokerInfoMap.clear();
brokerForbiddenCount.set(0);
brokerAbnormalMap.clear();
brokerForbiddenMap.clear();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index 3fd9cd4..50a3dc4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -75,6 +75,11 @@ public class BrokerPSInfoHolder {
}
}
+ public Tuple2<Boolean, Boolean> getBrokerPubStatus(int brokerId) {
+ return new Tuple2<>(enablePubBrokerIdSet.contains(brokerId),
+ enableSubBrokerIdSet.contains(brokerId));
+ }
+
/**
* update broker's subscribe topicInfo configures
*
@@ -131,11 +136,11 @@ public class BrokerPSInfoHolder {
}
/**
- * Gets the list of topic partitions whose subscribe status is enabled
+ * Gets the map of topic partitions whose subscribe status is enabled
*
* @param topicSet need query topic set
*/
- public List<Partition> getAcceptSubParts(Set<String> topicSet) {
+ public Map<String, Partition> getAcceptSubParts(Set<String> topicSet) {
return subTopicInfoView.getAcceptSubParts(topicSet, enableSubBrokerIdSet);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index b37e37a..28f8832 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -19,35 +19,64 @@ package org.apache.tubemq.server.master.nodemanage.nodebroker;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2B;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2B;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
public interface BrokerRunManager {
- boolean brokerRegister2M(String clientId, boolean enableTls,
- int tlsPort, long reportConfigId,
- int reportCheckSumId, boolean isTackData,
- String repBrokerConfInfo,
- List<String> repTopicConfInfo,
- boolean isOnline, boolean isOverTLS,
- StringBuilder sBuffer, ProcessResult result);
-
- boolean brokerHeartBeat2M(int brokerId, long reportConfigId,
- int reportCheckSumId, boolean isTackData,
- String repBrokerConfInfo,
- List<String> repTopicConfInfo, boolean isOnline,
+ void updBrokerStaticInfo(Map<Integer, BrokerConfEntity> brokerConfMap);
+
+ void updBrokerStaticInfo(BrokerConfEntity entity);
+
+ Tuple2<Long, Map<Integer, String>> getBrokerStaticInfo(boolean isOverTLS);
+
+ void delBrokerStaticInfo(int brokerId);
+
+ boolean brokerRegister2M(String clientId, BrokerInfo brokerInfo,
+ long reportConfigId, int reportCheckSumId,
+ boolean isTackData, String repBrokerConfInfo,
+ List<String> repTopicConfInfo, boolean isOnline,
+ boolean isOverTLS, StringBuilder sBuffer,
+ ProcessResult result);
+
+ boolean brokerHeartBeat2M(int brokerId, long reportConfigId, int reportCheckSumId,
+ boolean isTackData, String repBrokerConfInfo,
+ List<String> repTopicConfInfo,
+ boolean isTackRmvInfo, List<String> removedTopics,
+ int rptReadStatus, int rptWriteStatus, boolean isOnline,
StringBuilder sBuffer, ProcessResult result);
- boolean brokerClose2M(int brokerId);
+ boolean brokerClose2M(int brokerId, StringBuilder sBuffer, ProcessResult result);
+
+ boolean releaseBrokerRunInfo(int brokerId, String blockId);
- boolean brokerTimeout(int brokerId, long bookedId);
+ BrokerRunStatusInfo getBrokerRunStatusInfo(int brokerId);
+ Tuple2<Boolean, Boolean> getBrokerPublishStatus(int brokerId);
Tuple3<ManageStatus, String, Map<String, String>> getBrokerMetaConfigInfo(int brokerId);
+ void setRegisterDownConfInfo(int brokerId, StringBuilder sBuffer,
+ RegisterResponseM2B.Builder builder);
+
+ void setHeatBeatDownConfInfo(int brokerId, StringBuilder sBuffer,
+ HeartResponseM2B.Builder builder);
+
+ BrokerInfo getBrokerInfo(int brokerId);
+
+ Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> brokerIds);
+
void updBrokerCsmConfInfo(int brokerId,
ManageStatus mngStatus,
Map<String, TopicInfo> topicInfoMap);
@@ -56,4 +85,18 @@ public interface BrokerRunManager {
ManageStatus mngStatus,
Map<String, TopicInfo> topicInfoMap);
+ BrokerAbnHolder getBrokerAbnHolder();
+
+ Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet);
+
+ int getSubTopicMaxBrokerCount(Set<String> topicSet);
+
+ Map<String, Partition> getSubBrokerAcceptSubParts(Set<String> topicSet);
+
+ List<Partition> getSubBrokerAcceptSubParts(String topic);
+
+ TopicInfo getPubBrokerTopicInfo(int brokerId, String topic);
+
+ List<TopicInfo> getPubBrokerPushedTopicInfo(int brokerId);
+
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
index 45c5459..80ffd80 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
@@ -36,6 +36,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
public class BrokerRunStatusInfo {
private static final Logger logger =
@@ -46,7 +48,7 @@ public class BrokerRunStatusInfo {
private final BrokerRunManager brokerRunManager;
private BrokerInfo brokerInfo;
- private long createId;
+ private String createId;
// config change flag
private AtomicBoolean isConfChanged = new AtomicBoolean(false);
private AtomicLong confChangeNo =
@@ -60,7 +62,7 @@ public class BrokerRunStatusInfo {
// broker sync data info
BrokerSyncData brokerSyncData = new BrokerSyncData();
// broker status conditions
- private boolean isOnline = false; // broker online flag
+ private boolean isOnline = false; // broker online flag
private boolean isDoneDataLoad = false;
private boolean isDoneDataSub = false;
private boolean isDoneDataPub = false;
@@ -83,7 +85,7 @@ public class BrokerRunStatusInfo {
String brokerConfInfo, Map<String, String> topicConfInfoMap,
boolean isOverTls) {
resetStatusInfo();
- this.createId = System.nanoTime();
+ this.createId = String.valueOf(System.nanoTime());
this.brokerInfo = brokerInfo;
long curTime = System.currentTimeMillis();
this.isOverTLS = isOverTls;
@@ -111,20 +113,37 @@ public class BrokerRunStatusInfo {
&& this.confChangeNo.get() != this.confLoadedNo.get()));
}
- public long getCreateId() {
+ public String getCreateId() {
return this.createId;
}
+ public boolean isOverTLS() {
+ return isOverTLS;
+ }
+
+ public BrokerInfo getBrokerInfo() {
+ return brokerInfo;
+ }
+
+ public boolean inProcessingStatus() {
+ return curStepStatus != StepStatus.STEP_STATUS_UNDEFINED;
+ }
+
+ public StepStatus getCurStepStatus() {
+ return curStepStatus;
+ }
+
+ public boolean isOnline() {
+ return isOnline;
+ }
/**
* Get need sync to broker's data
*
- * @param retValue return data container, ** must not null **
- * @return void
+ * @return return data container
*/
- public void getNeedSyncData(Tuple4<Long,
- Integer, String, Map<String, String>> retValue) {
- brokerSyncData.getBrokerSyncData(retValue);
+ public Tuple4<Long, Integer, String, List<String>> getNeedSyncData() {
+ return brokerSyncData.getBrokerSyncData();
}
/**
@@ -151,6 +170,31 @@ public class BrokerRunStatusInfo {
goNextStatus(isRegister, isSynchronized, sBuffer);
}
+ /* Format to json */
+ public StringBuilder toJsonString(StringBuilder sBuffer) {
+ Tuple2<Boolean, Boolean> confStatusTuple = getDataSyncStatus();
+ sBuffer.append("\"BrokerRunStatusInfo\":{\"type\":\"BrokerRunStatusInfo\"")
+ .append(",\"brokerInfo\":\"").append(brokerInfo.getBrokerStrInfo())
+ .append("\",\"createId\":\"").append(createId)
+ .append("\",\"isConfChanged\":").append(confStatusTuple.getF0())
+ .append("\",\"isConfLoaded\":").append(confStatusTuple.getF1())
+ .append(",\"confChangeNo\":").append(confChangeNo.get())
+ .append(",\"curStepStatus\":\"").append(curStepStatus.getDescription())
+ .append("\",\"nextStepOpTimeInMills\":").append(nextStepOpTimeInMills)
+ .append("\",\"confLoadedNo\":").append(confLoadedNo.get())
+ .append(",\"isOnline\":").append(isOnline)
+ .append(",\"isDoneDataLoad\":").append(isDoneDataLoad)
+ .append(",\"isDoneDataSub\":").append(isDoneDataSub)
+ .append(",\"isDoneDataPub\":").append(isDoneDataPub)
+ .append(",\"isDoneDataPub\":").append(isDoneDataPub)
+ .append(",\"isOverTLS\":").append(isOverTLS)
+ .append(",\"lastBrokerSyncTime\":").append(lastBrokerSyncTime)
+ .append(",\"maxConfLoadedTimeInMs\":").append(maxConfLoadedTimeInMs)
+ .append(",\"curConfLoadTimeInMs\":").append(curConfLoadTimeInMs)
+ .append(",\"BrokerSyncData\":");
+ return brokerSyncData.toJsonString(sBuffer);
+ }
+
private void goNextStatus(boolean isRegister,
boolean isSynchronized,
StringBuilder sBuffer) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
index 09de48d..b320ed1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
@@ -166,17 +166,20 @@ public class BrokerSyncData {
/**
* Get need sync to broker's data
*
- * @param retValue return data container, ** must not null **
- * @return void
+ * @return return data container,
*/
- public void getBrokerSyncData(Tuple4<Long,
- Integer, String, Map<String, String>> retValue) {
+ public Tuple4<Long, Integer, String, List<String>> getBrokerSyncData() {
if (isConfSynchronized()) {
- retValue.setFieldsValue(syncDownDataConfId.get(), syncDownDataChkSumId,
- syncDownBrokerConfInfo, syncDownTopicConfInfoMap);
+ return new Tuple4<>(syncDownDataConfId.get(), syncDownDataChkSumId, null, null);
} else {
- retValue.setFieldsValue(syncDownDataConfId.get(),
- syncDownDataChkSumId, null, null);
+ List<String> topicInfoList = new ArrayList<>();
+ for (String topicInfo : syncDownTopicConfInfoMap.values()) {
+ if (topicInfo != null) {
+ topicInfoList.add(topicInfo);
+ }
+ }
+ return new Tuple4<>(syncDownDataConfId.get(), syncDownDataChkSumId,
+ syncDownBrokerConfInfo, topicInfoList);
}
}
@@ -214,6 +217,25 @@ public class BrokerSyncData {
|| !Objects.equals(syncDownTopicConfInfoMap, topicConfInfoMap);
}
+ /* Format to json */
+ public StringBuilder toJsonString(StringBuilder sBuffer) {
+ sBuffer.append("{\"dataPushId\":").append(dataPushId)
+ .append(",\"mngStatus\":\"").append(mngStatus.getDescription())
+ .append("\",\"syncDownDataConfId\":").append(syncDownDataConfId.get())
+ .append(",\"syncDownDataChkSumId\":").append(syncDownDataChkSumId)
+ .append(",\"isStatusChanged\":").append(isStatusChanged)
+ .append(",\"isConfChanged\":").append(isConfChanged)
+ .append(",\"syncDownBrokerConfInfo\":\"").append(syncDownBrokerConfInfo)
+ .append("\",\"syncDownTopicConfInfoMap\":\"").append(syncDownTopicConfInfoMap.toString())
+ .append("\",\"syncUpDataConfId\":").append(syncUpDataConfId)
+ .append(",\"syncUpDataChkSumId\":").append(syncUpDataChkSumId)
+ .append(",\"syncUpBrokerConfInfo\":\"").append(syncUpBrokerConfInfo)
+ .append("\",\"syncUpTopicConfInfos\":\"").append(syncUpTopicConfInfos.toString())
+ .append("\",\"lastDataUpTime\":").append(lastDataUpTime)
+ .append("}");
+ return sBuffer;
+ }
+
/**
* parse broker report configure to topicInfo
*
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
deleted file mode 100644
index d0eedcf..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
+++ /dev/null
@@ -1,843 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.nodemanage.nodebroker;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.corebase.utils.CheckSum;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.corebase.utils.Tuple2;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
-import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
-import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class BrokerSyncStatusInfo {
- private static final Logger logger =
- LoggerFactory.getLogger(BrokerSyncStatusInfo.class);
-
- private boolean isFirstInit = true;
- private int brokerId = -2;
- private String brokerIp; //broker ip
- private int brokerPort; //broker port
- private int brokerTLSPort; //broker tls port
-
- /* Broker manager stratus :-2:undefine,1:pending for approval,5:online,7:offline */
- private int brokerManageStatus = -2;
-
- /* Broker run status, -2:undefine, 31: online(wait-server), 32:online(readonly), 33:online(read&write)
- * 51: offline(unreadable¬ writeable), 52: offline(wait load balance) */
- private int brokerRunStatus = -2;
-
- private long subStepOpTimeInMills = 0;
- private boolean isBrokerRegister = false; //broker register flag
- private boolean isBrokerOnline = false; //broker online flag
- private boolean isOverTLS = false; //enable tls
- private boolean isBrokerConfChanged = false; //config change flag
- private boolean isBrokerLoaded = false; //broker load status
- private boolean isFastStart = false; //enable fast start
-
- private long lastPushBrokerConfId = -2;
- private int lastPushBrokerCheckSumId = -2;
- private long lastDataPushInMills = 0;
- private String lastPushBrokerDefaultConfInfo;
- private List<String> lastPushBrokerTopicSetConfInfo =
- new ArrayList<>();
-
- private long reportedBrokerConfId = -2;
- private int reportedBrokerCheckSumId = -2;
- private long lastDataReportInMills = 0;
- private String reportedBrokerDefaultConfInfo;
- private List<String> reportedBrokerTopicSetConfInfo =
- new ArrayList<>();
-
- private AtomicLong currBrokerConfId = new AtomicLong(0);
- private int currBrokerCheckSumId = 0;
- private String curBrokerDefaultConfInfo;
- private List<String> curBrokerTopicSetConfInfo =
- new ArrayList<>();
-
- private int numPartitions = 1; //partition number
- private int numTopicStores = 1; //store number
- private int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
- private int unflushThreshold = 1000; //flush threshold
- private int unflushInterval = 10000; //flush interval
- private int memCacheMsgSizeInMB = 3; //memory cache size
- private int memCacheMsgCntInK = 10; //memory cache message count
- private int memCacheFlushIntvl = 20000; //memory cache flush interval
- private String deletePolicy = "delete,168h"; //data delete policy
- private String deleteWhen = "0 0 6,18 * * ?"; //date delete policy execute time
- private boolean acceptPublish = true; //accept publish
- private boolean acceptSubscribe = true; //accept subscribe
-
- //Constructor
- public BrokerSyncStatusInfo(final BdbBrokerConfEntity bdbEntity,
- List<String> brokerTopicSetConfInfo) {
- updateBrokerConfigureInfo(bdbEntity.getBrokerDefaultConfInfo(),
- brokerTopicSetConfInfo);
- this.brokerManageStatus = bdbEntity.getManageStatus();
- this.isBrokerConfChanged = bdbEntity.isConfDataUpdated();
- this.isBrokerLoaded = bdbEntity.isBrokerLoaded();
- this.brokerId = bdbEntity.getBrokerId();
- this.brokerIp = bdbEntity.getBrokerIp();
- this.brokerPort = bdbEntity.getBrokerPort();
- this.brokerTLSPort = bdbEntity.getBrokerTLSPort();
- this.isFastStart = false;
- if (this.brokerManageStatus > TStatusConstants.STATUS_MANAGE_APPLY) {
- currBrokerConfId.incrementAndGet();
- }
- }
-
- //Constructor
- public BrokerSyncStatusInfo(BrokerConfEntity brokerEntity,
- List<String> brokerTopicSetConfInfo) {
- updateBrokerConfigureInfo(brokerEntity.getBrokerDefaultConfInfo(),
- brokerTopicSetConfInfo);
- this.brokerManageStatus = brokerEntity.getManageStatus().getCode();
- this.isBrokerConfChanged = brokerEntity.isConfDataUpdated();
- this.isBrokerLoaded = brokerEntity.isBrokerLoaded();
- this.brokerId = brokerEntity.getBrokerId();
- this.brokerIp = brokerEntity.getBrokerIp();
- this.brokerPort = brokerEntity.getBrokerPort();
- this.brokerTLSPort = brokerEntity.getBrokerTLSPort();
- this.isFastStart = false;
- if (this.brokerManageStatus > TStatusConstants.STATUS_MANAGE_APPLY) {
- currBrokerConfId.incrementAndGet();
- }
- }
- /**
- * Update current broker config info
- *
- * @param brokerManageStatus broker status
- * @param isBrokerConfChanged
- * @param isBrokerLoaded
- * @param brokerDefaultConfInfo broker default config
- * @param brokerTopicSetConfInfo topic config
- * @param isOnlineUpdate
- */
- public void updateCurrBrokerConfInfo(int brokerManageStatus, boolean isBrokerConfChanged,
- boolean isBrokerLoaded, String brokerDefaultConfInfo,
- List<String> brokerTopicSetConfInfo,
- boolean isOnlineUpdate) {
- this.brokerManageStatus = brokerManageStatus;
- this.currBrokerConfId.incrementAndGet();
- this.isBrokerConfChanged = isBrokerConfChanged;
- this.isBrokerLoaded = isBrokerLoaded;
- updateBrokerConfigureInfo(brokerDefaultConfInfo, brokerTopicSetConfInfo);
- this.lastPushBrokerConfId = this.currBrokerConfId.get();
- this.lastPushBrokerCheckSumId = this.currBrokerCheckSumId;
- this.lastDataPushInMills = System.currentTimeMillis();
- this.lastPushBrokerDefaultConfInfo = this.curBrokerDefaultConfInfo;
- this.lastPushBrokerTopicSetConfInfo = this.curBrokerTopicSetConfInfo;
- switch (this.brokerManageStatus) {
- case TStatusConstants.STATUS_MANAGE_ONLINE: {
- this.brokerRunStatus = isOnlineUpdate
- ? TStatusConstants.STATUS_SERVICE_TOONLINE_PART_WAIT_REGISTER
- : TStatusConstants.STATUS_SERVICE_TOONLINE_WAIT_REGISTER;
- break;
- }
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE: {
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
- break;
- }
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ: {
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_WRITE;
- break;
- }
- default: {
- if (this.isBrokerRegister) {
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE;
- }
- }
- }
- this.subStepOpTimeInMills = System.currentTimeMillis();
- }
-
- /**
- * Reset broker report info
- */
- public void resetBrokerReportInfo() {
- this.reportedBrokerConfId = -2;
- this.reportedBrokerCheckSumId = -2;
- this.reportedBrokerDefaultConfInfo = "";
- this.reportedBrokerTopicSetConfInfo =
- new ArrayList<>();
- this.isBrokerRegister = false;
- this.isBrokerOnline = false;
- this.isFastStart = false;
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_UNDEFINED;
- this.subStepOpTimeInMills = System.currentTimeMillis();
- }
-
- /**
- * Set broker report info
- *
- * @param isRegister
- * @param reportConfigId
- * @param reportCheckSumId
- * @param isTackData
- * @param reportDefaultConfInfo
- * @param reportTopicSetConfInfo
- * @param isBrokerRegister
- * @param isBrokerOnline
- * @param isOverTLS
- */
- public void setBrokerReportInfo(boolean isRegister, long reportConfigId,
- int reportCheckSumId, boolean isTackData,
- String reportDefaultConfInfo,
- List<String> reportTopicSetConfInfo,
- boolean isBrokerRegister, boolean isBrokerOnline, boolean isOverTLS) {
- this.reportedBrokerConfId = reportConfigId;
- this.reportedBrokerCheckSumId = reportCheckSumId;
- if (isTackData) {
- this.reportedBrokerDefaultConfInfo = reportDefaultConfInfo;
- if (reportTopicSetConfInfo == null) {
- this.reportedBrokerTopicSetConfInfo = new ArrayList<>();
- } else {
- this.reportedBrokerTopicSetConfInfo = reportTopicSetConfInfo;
- }
- this.lastDataReportInMills = System.currentTimeMillis();
- }
- this.isBrokerRegister = isBrokerRegister;
- this.isBrokerOnline = isBrokerOnline;
- this.isOverTLS = isOverTLS;
- if (isRegister) {
- if (this.isBrokerOnline) {
- if (this.reportedBrokerConfId <= 0) {
- if (this.isBrokerConfChanged
- || !this.isBrokerLoaded
- || this.reportedBrokerCheckSumId != this.lastPushBrokerCheckSumId
- || !isFirstInit) {
- return;
- }
- this.lastPushBrokerConfId = this.reportedBrokerConfId;
- this.currBrokerConfId.set(this.lastPushBrokerConfId);
- this.lastPushBrokerCheckSumId = this.currBrokerCheckSumId;
- this.lastDataPushInMills = System.currentTimeMillis();
- this.brokerRunStatus =
- TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
- this.subStepOpTimeInMills = System.currentTimeMillis();
- this.isFirstInit = false;
- } else {
- this.isFirstInit = false;
- this.isFastStart = true;
- switch (this.brokerManageStatus) {
- case TStatusConstants.STATUS_MANAGE_ONLINE: {
- this.brokerRunStatus =
- TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
- break;
- }
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE: {
- this.brokerRunStatus =
- TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_READ;
- break;
- }
- case TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ: {
- this.brokerRunStatus =
- TStatusConstants.STATUS_SERVICE_TOONLINE_ONLY_WRITE;
- break;
- }
- default: {
- this.brokerRunStatus =
- TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE;
- }
- }
- this.subStepOpTimeInMills = 0;
- }
- }
- }
- }
-
- /**
- * Set broker status to offline
- */
- public void setBrokerOffline() {
- if (this.brokerManageStatus != TStatusConstants.STATUS_MANAGE_OFFLINE) {
- this.brokerManageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
- }
- if (this.isBrokerOnline) {
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_WRITE;
- } else if (this.isBrokerRegister) {
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_NOT_READ_WRITE;
- } else {
- this.brokerRunStatus = TStatusConstants.STATUS_SERVICE_TOOFFLINE_WAIT_REBALANCE;
- }
- this.subStepOpTimeInMills = System.currentTimeMillis();
- }
-
- /**
- * Check if need sync config data to broker
- *
- * @return true if need otherwise false
- */
- public boolean needSyncConfDataToBroker() {
- if (this.lastPushBrokerConfId != this.reportedBrokerConfId
- || this.lastPushBrokerCheckSumId != this.reportedBrokerCheckSumId) {
- return true;
- }
- return false;
- }
-
- /**
- * According to last report time and current time to decide if need to report data
- *
- * @return true if need report data otherwise false
- */
- public boolean needReportData() {
- if (System.currentTimeMillis() - this.lastDataPushInMills
- > TServerConstants.CFG_REPORT_DEFAULT_SYNC_DURATION) {
- this.lastDataPushInMills = System.currentTimeMillis();
- return true;
- }
- return false;
- }
-
- public void forceSyncConfDataToBroker() {
- this.isFastStart = true;
- this.lastPushBrokerConfId = this.currBrokerConfId.incrementAndGet();
- this.lastPushBrokerCheckSumId = this.currBrokerCheckSumId;
- this.lastPushBrokerDefaultConfInfo = this.curBrokerDefaultConfInfo;
- this.lastPushBrokerTopicSetConfInfo = this.curBrokerTopicSetConfInfo;
- this.lastDataPushInMills = System.currentTimeMillis();
- }
-
- public Long getCurrBrokerConfId() {
- return currBrokerConfId.get();
- }
-
- public int getCurrBrokerCheckSumId() {
- return currBrokerCheckSumId;
- }
-
- public String getCurBrokerDefaultConfInfo() {
- return curBrokerDefaultConfInfo;
- }
-
- public List<String> getCurBrokerTopicSetConfInfo() {
- return curBrokerTopicSetConfInfo;
- }
-
- public long getLastDataReportInMills() {
- return lastDataReportInMills;
- }
-
- public void setLastDataReportInMills(long lastDataReportInMills) {
- this.lastDataReportInMills = lastDataReportInMills;
- }
-
- public boolean isFastStart() {
- return isFastStart;
- }
-
- public void setFastStart(boolean isFastStart) {
- this.isFastStart = isFastStart;
- }
-
- public boolean isBrokerOnline() {
- return this.isBrokerOnline;
- }
-
- public void setBrokerOnline(boolean isBrokerOnline) {
- this.isBrokerOnline = isBrokerOnline;
- }
-
- public boolean isBrokerRegister() {
- return this.isBrokerRegister;
- }
-
- public void setBrokerRunStatus(boolean isBrokerRegister,
- boolean isBrokerOnline) {
- this.isBrokerRegister = isBrokerRegister;
- this.isBrokerOnline = isBrokerOnline;
- }
-
- public long getReportedBrokerConfId() {
- return reportedBrokerConfId;
- }
-
- public int getNumPartitions() {
- return numPartitions;
- }
-
- public int getUnflushThreshold() {
- return unflushThreshold;
- }
-
- public int getUnflushInterval() {
- return unflushInterval;
- }
-
- public String getDeletePolicy() {
- return deletePolicy;
- }
-
- public String getDeleteWhen() {
- return deleteWhen;
- }
-
- public boolean isAcceptPublish() {
- return acceptPublish;
- }
-
- public boolean isAcceptSubscribe() {
- return acceptSubscribe;
- }
-
- public int getBrokerRunStatus() {
- return brokerRunStatus;
- }
-
- public void setBrokerRunStatus(int brokerRunStatus) {
- this.brokerRunStatus = brokerRunStatus;
- this.subStepOpTimeInMills = System.currentTimeMillis();
- }
-
- public boolean isOverTLS() {
- return isOverTLS;
- }
-
- public void setOverTLS(boolean overTLS) {
- isOverTLS = overTLS;
- }
-
- public int getBrokerTLSPort() {
- return brokerTLSPort;
- }
-
- public void setBrokerTLSPort(int brokerTLSPort) {
- this.brokerTLSPort = brokerTLSPort;
- }
-
- public long getSubStepOpTimeInMills() {
- return subStepOpTimeInMills;
- }
-
- public int getBrokerId() {
- return brokerId;
- }
-
- public String getBrokerIp() {
- return brokerIp;
- }
-
- public int getBrokerPort() {
- return brokerPort;
- }
-
- public int getBrokerManageStatus() {
- return brokerManageStatus;
- }
-
- private int calculateConfigCrc32Value(final String brokerDefaultConfInfo,
- final List<String> brokerTopicSetConfInfo) {
- int result = -1;
- int capacity = 0;
- Collections.sort(brokerTopicSetConfInfo);
- capacity += brokerDefaultConfInfo.length();
- for (String itemStr : brokerTopicSetConfInfo) {
- capacity += itemStr.length();
- }
- capacity *= 2;
- for (int i = 1; i < 3; i++) {
- result = inCalcBufferResult(capacity, brokerDefaultConfInfo, brokerTopicSetConfInfo);
- if (result >= 0) {
- return result;
- }
- capacity *= i + 1;
- }
- logger.error("Calc BrokerConfigure Crc error!");
- return 0;
- }
-
- private int inCalcBufferResult(int capacity, final String brokerDefaultConfInfo,
- final List<String> brokerTopicSetConfInfo) {
- final ByteBuffer buffer = ByteBuffer.allocate(capacity);
- buffer.put(StringUtils.getBytesUtf8(brokerDefaultConfInfo));
- for (String itemStr : brokerTopicSetConfInfo) {
- byte[] itemData = StringUtils.getBytesUtf8(itemStr);
- if (itemData.length > buffer.remaining()) {
- return -1;
- }
- buffer.put(itemData);
- }
- return CheckSum.crc32(buffer.array());
- }
-
- /**
- * Update broker config, field will set to default value if brokerDefaultConfInfo is empty,
- * else will parse the string value and then set broker config
- *
- * @param brokerDefaultConfInfo a string, field join with ":",
- * @param brokerTopicSetConfInfo
- */
- private void updateBrokerConfigureInfo(String brokerDefaultConfInfo,
- List<String> brokerTopicSetConfInfo) {
- int crc32CheckSum =
- calculateConfigCrc32Value(brokerDefaultConfInfo, brokerTopicSetConfInfo);
- if (crc32CheckSum != this.currBrokerCheckSumId) {
- this.currBrokerCheckSumId = crc32CheckSum;
- this.curBrokerTopicSetConfInfo = brokerTopicSetConfInfo;
- this.curBrokerDefaultConfInfo = brokerDefaultConfInfo;
- if (TStringUtils.isBlank(brokerDefaultConfInfo)) {
- this.numPartitions = 1;
- this.numTopicStores = 1;
- this.unflushThreshold = 1000;
- this.unflushInterval = 10000;
- this.unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
- this.deletePolicy = "delete,168h";
- this.deleteWhen = "0 0 6,18 * * ?";
- this.acceptPublish = true;
- this.acceptSubscribe = true;
- this.memCacheFlushIntvl = 20000;
- this.memCacheMsgCntInK = 10;
- this.memCacheMsgSizeInMB = 3;
- } else {
- String[] brokerDefaultConfInfoArr =
- brokerDefaultConfInfo.split(TokenConstants.ATTR_SEP);
- this.numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
- this.acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
- this.acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
- this.unflushThreshold = Integer.parseInt(brokerDefaultConfInfoArr[3]);
- this.unflushInterval = Integer.parseInt(brokerDefaultConfInfoArr[4]);
- this.deleteWhen = brokerDefaultConfInfoArr[5];
- this.deletePolicy = brokerDefaultConfInfoArr[6];
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[7])) {
- this.numTopicStores = Integer.parseInt(brokerDefaultConfInfoArr[7]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[8])) {
- this.unFlushDataHold = Integer.parseInt(brokerDefaultConfInfoArr[8]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[9])) {
- this.memCacheMsgSizeInMB = Integer.parseInt(brokerDefaultConfInfoArr[9]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[10])) {
- this.memCacheMsgCntInK = Integer.parseInt(brokerDefaultConfInfoArr[10]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[11])) {
- this.memCacheFlushIntvl = Integer.parseInt(brokerDefaultConfInfoArr[11]);
- }
- }
- }
- }
-
- public long getLastPushBrokerConfId() {
- return lastPushBrokerConfId;
- }
-
- public void setLastPushBrokerConfId(long lastPushBrokerConfId) {
- this.lastPushBrokerConfId = lastPushBrokerConfId;
- }
-
- public int getLastPushBrokerCheckSumId() {
- return lastPushBrokerCheckSumId;
- }
-
- public void setLastPushBrokerCheckSumId(int lastPushBrokerCheckSumId) {
- this.lastPushBrokerCheckSumId = lastPushBrokerCheckSumId;
- }
-
- public long getLastDataPushInMills() {
- return lastDataPushInMills;
- }
-
- public void setLastDataPushInMills(long lastDataPushInMills) {
- this.lastDataPushInMills = lastDataPushInMills;
- }
-
- public String getLastPushBrokerDefaultConfInfo() {
- return lastPushBrokerDefaultConfInfo;
- }
-
- public void setLastPushBrokerDefaultConfInfo(String lastPushBrokerDefaultConfInfo) {
- this.lastPushBrokerDefaultConfInfo = lastPushBrokerDefaultConfInfo;
- }
-
- public List<String> getLastPushBrokerTopicSetConfInfo() {
- return lastPushBrokerTopicSetConfInfo;
- }
-
- public void setLastPushBrokerTopicSetConfInfo(List<String> lastPushBrokerTopicSetConfInfo) {
- this.lastPushBrokerTopicSetConfInfo = lastPushBrokerTopicSetConfInfo;
- }
-
- public String getReportedBrokerDefaultConfInfo() {
- return reportedBrokerDefaultConfInfo;
- }
-
- public void setReportedBrokerDefaultConfInfo(String reportedBrokerDefaultConfInfo) {
- this.reportedBrokerDefaultConfInfo = reportedBrokerDefaultConfInfo;
- }
-
- public List<String> getReportedBrokerTopicSetConfInfo() {
- return reportedBrokerTopicSetConfInfo;
- }
-
- public void setReportedBrokerTopicSetConfInfo(List<String> reportedBrokerTopicSetConfInfo) {
- this.reportedBrokerTopicSetConfInfo = reportedBrokerTopicSetConfInfo;
- }
-
- public boolean isBrokerConfChanged() {
- return isBrokerConfChanged;
- }
-
- public void setBrokerConfChanged() {
- this.isBrokerConfChanged = true;
- this.isBrokerLoaded = false;
- }
-
- public boolean isBrokerLoaded() {
- return isBrokerLoaded;
- }
-
- public void setBrokerLoaded() {
- this.isBrokerLoaded = true;
- this.isBrokerConfChanged = false;
- }
-
- // #lizard forgives
- private StringBuilder getBrokerAndTopicConfJsonInfo(String brokerConfInfo,
- String brokerJsonKey,
- List<String> topicConfInfoList,
- String topicListJsonKey,
- final StringBuilder strBuffer) {
- // format config to json
- strBuffer.append(",\"").append(brokerJsonKey).append("\":");
- if (TStringUtils.isBlank(brokerConfInfo)) {
- strBuffer.append("{},\"").append(topicListJsonKey).append("\":[]");
- return strBuffer;
- }
- // broker default metadata
- String[] brokerDefaultConfInfoArr =
- brokerConfInfo.split(TokenConstants.ATTR_SEP);
- final int numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
- final boolean acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
- final boolean acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
- final int unflushThreshold = Integer.parseInt(brokerDefaultConfInfoArr[3]);
- final int unflushInterval = Integer.parseInt(brokerDefaultConfInfoArr[4]);
- final String deleteWhen = brokerDefaultConfInfoArr[5];
- final String deletePolicy = brokerDefaultConfInfoArr[6];
- int numTopicStores = 1;
- int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
- int memCacheMsgSizeInMB = 3;
- int memCacheMsgCntInK = 10;
- int memCacheFlushIntvl = 20000;
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[7])) {
- numTopicStores =
- Integer.parseInt(brokerDefaultConfInfoArr[7]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[8])) {
- unFlushDataHold =
- Integer.parseInt(brokerDefaultConfInfoArr[8]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[9])) {
- memCacheMsgSizeInMB =
- Integer.parseInt(brokerDefaultConfInfoArr[9]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[10])) {
- memCacheMsgCntInK =
- Integer.parseInt(brokerDefaultConfInfoArr[10]);
- }
- if (!TStringUtils.isBlank(brokerDefaultConfInfoArr[11])) {
- memCacheFlushIntvl =
- Integer.parseInt(brokerDefaultConfInfoArr[11]);
- }
- //format broker config to json
- strBuffer.append("{\"numPartitions\":").append(numPartitions)
- .append(",\"acceptPublish\":").append(acceptPublish)
- .append(",\"acceptSubscribe\":").append(acceptSubscribe)
- .append(",\"unflushThreshold\":").append(unflushThreshold)
- .append(",\"unflushInterval\":").append(unflushInterval)
- .append(",\"deleteWhen\":\"").append(deleteWhen)
- .append("\",\"deletePolicy\":\"").append(deletePolicy)
- .append("\",\"numTopicStores\":").append(numTopicStores)
- .append(",\"unflushDataHold\":").append(unFlushDataHold)
- .append(",\"memCacheMsgSizeInMB\":").append(memCacheMsgSizeInMB)
- .append(",\"memCacheMsgCntInK\":").append(memCacheMsgCntInK)
- .append(",\"memCacheFlushIntvl\":").append(memCacheFlushIntvl)
- .append("}");
- strBuffer.append(",\"").append(topicListJsonKey).append("\":[");
- if (topicConfInfoList == null
- || topicConfInfoList.isEmpty()) {
- strBuffer.append("]");
- return strBuffer;
- }
- // topic config metadata in the broker
- // format topic metadata
- int count = 0;
- for (String strTopicConfInfo : topicConfInfoList) {
- if (TStringUtils.isBlank(strTopicConfInfo)) {
- continue;
- }
- String[] topicConfInfoArr =
- strTopicConfInfo.split(TokenConstants.ATTR_SEP);
- if (count++ > 0) {
- strBuffer.append(",");
- }
- strBuffer.append("{\"topicName\":\"").append(topicConfInfoArr[0]).append("\"");
- int tmpPartNum = numPartitions;
- if (!TStringUtils.isBlank(topicConfInfoArr[1])) {
- tmpPartNum = Integer.parseInt(topicConfInfoArr[1]);
- }
- strBuffer.append(",\"numPartitions\":").append(tmpPartNum);
- boolean tmpAcceptPublish = acceptPublish;
- if (!TStringUtils.isBlank(topicConfInfoArr[2])) {
- tmpAcceptPublish = Boolean.parseBoolean(topicConfInfoArr[2]);
- }
- strBuffer.append(",\"acceptPublish\":").append(tmpAcceptPublish);
- boolean tmpAcceptSubscribe = acceptSubscribe;
- if (!TStringUtils.isBlank(topicConfInfoArr[3])) {
- tmpAcceptSubscribe = Boolean.parseBoolean(topicConfInfoArr[3]);
- }
- strBuffer.append(",\"acceptSubscribe\":").append(tmpAcceptSubscribe);
- int tmpUnflushThreshold = unflushThreshold;
- if (!TStringUtils.isBlank(topicConfInfoArr[4])) {
- tmpUnflushThreshold = Integer.parseInt(topicConfInfoArr[4]);
- }
- strBuffer.append(",\"unflushThreshold\":").append(tmpUnflushThreshold);
- int tmpUnflushInterval = unflushInterval;
- if (!TStringUtils.isBlank(topicConfInfoArr[5])) {
- tmpUnflushInterval = Integer.parseInt(topicConfInfoArr[5]);
- }
- strBuffer.append(",\"unflushInterval\":").append(tmpUnflushInterval);
- String tmpDeleteWhen = deleteWhen;
- if (!TStringUtils.isBlank(topicConfInfoArr[6])) {
- tmpDeleteWhen = topicConfInfoArr[6];
- }
- strBuffer.append(",\"deleteWhen\":\"").append(tmpDeleteWhen).append("\"");
- String tmpDeletePolicy = deletePolicy;
- if (!TStringUtils.isBlank(topicConfInfoArr[7])) {
- tmpDeletePolicy = topicConfInfoArr[7];
- }
- int tmpNumTopicStores = numTopicStores;
- if (!TStringUtils.isBlank(topicConfInfoArr[8])) {
- tmpNumTopicStores = Integer.parseInt(topicConfInfoArr[8]);
- }
- strBuffer.append(",\"numTopicStores\":").append(tmpNumTopicStores);
- strBuffer.append(",\"deletePolicy\":\"").append(tmpDeletePolicy).append("\"");
- int topicStatusId = TStatusConstants.STATUS_TOPIC_OK;
- if (!TStringUtils.isBlank(topicConfInfoArr[9])) {
- topicStatusId = Integer.parseInt(topicConfInfoArr[9]);
- }
- int tmpunFlushDataHold = unFlushDataHold;
- if (!TStringUtils.isBlank(topicConfInfoArr[10])) {
- tmpunFlushDataHold = Integer.parseInt(topicConfInfoArr[10]);
- }
- strBuffer.append(",\"unflushDataHold\":").append(tmpunFlushDataHold);
- int tmpmemCacheMsgSizeInMB = memCacheMsgSizeInMB;
- int tmpmemCacheMsgCntInK = memCacheMsgCntInK;
- int tmpmemCacheFlushIntvl = memCacheFlushIntvl;
- int tmpMaxMsgSize = ClusterConfigHolder.getMaxMsgSize();
- int tmpMinMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
- if (!TStringUtils.isBlank(topicConfInfoArr[11])) {
- tmpmemCacheMsgSizeInMB = Integer.parseInt(topicConfInfoArr[11]);
- }
- if (!TStringUtils.isBlank(topicConfInfoArr[12])) {
- tmpmemCacheMsgCntInK = Integer.parseInt(topicConfInfoArr[12]);
- }
- if (!TStringUtils.isBlank(topicConfInfoArr[13])) {
- tmpmemCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
- }
- if (topicConfInfoArr.length > 14) {
- if (!TStringUtils.isNotBlank(topicConfInfoArr[14])) {
- tmpMaxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
- Tuple2<Integer, Integer> calcResult =
- ClusterConfigHolder.calcMaxMsgSize(tmpMaxMsgSize);
- tmpMaxMsgSize = calcResult.getF0();
- tmpMinMemCacheSize = calcResult.getF1();
- }
- }
- strBuffer.append(",\"memCacheMsgSizeInMB\":").append(tmpmemCacheMsgSizeInMB);
- strBuffer.append(",\"memCacheMsgCntInK\":").append(tmpmemCacheMsgCntInK);
- strBuffer.append(",\"memCacheFlushIntvl\":").append(tmpmemCacheFlushIntvl);
- strBuffer.append(",\"maxMsgSize\":").append(tmpMaxMsgSize);
- strBuffer.append(",\"minMemCacheSize\":").append(tmpMinMemCacheSize);
- strBuffer.append(",\"topicStatusId\":").append(topicStatusId);
- strBuffer.append("}");
- }
- strBuffer.append("]");
- return strBuffer;
- }
-
- /* Format to json */
- public StringBuilder toJsonString(StringBuilder strBuffer, boolean isOrig) {
- strBuffer.append("\"BrokerSyncStatusInfo\":{\"type\":\"BrokerSyncStatusInfo\",\"brokerId\":").append(brokerId)
- .append(",\"brokerAddress\":\"").append(brokerIp).append(":").append(brokerPort)
- .append("\",\"brokerManageStatus\":").append(brokerManageStatus)
- .append(",\"brokerRunStatus\":").append(brokerRunStatus)
- .append(",\"subStepOpTimeInMills\":").append(subStepOpTimeInMills)
- .append(",\"lastDataReportInMills\":").append(lastDataReportInMills)
- .append(",\"isBrokerRegister\":").append(isBrokerRegister)
- .append(",\"isBrokerOnline\":").append(isBrokerOnline)
- .append(",\"isFirstInit\":").append(isFirstInit)
- .append(",\"isBrokerConfChanged\":").append(isBrokerConfChanged)
- .append(",\"isBrokerLoaded\":").append(isBrokerLoaded)
- .append(",\"isFastStart\":").append(isFastStart);
- if (isOrig) {
- strBuffer.append(",\"currBrokerConfId\":").append(currBrokerConfId.get())
- .append(",\"currBrokerCheckSumId\":").append(currBrokerCheckSumId)
- .append(",\"curBrokerDefaultConfInfo\":\"").append(curBrokerDefaultConfInfo)
- .append("\",\"curBrokerTopicSetConfInfo\":\"").append(curBrokerTopicSetConfInfo.toString())
- .append("\",\"lastPushBrokerConfId\":").append(lastPushBrokerConfId)
- .append(",\"lastPushBrokerCheckSumId\":").append(lastPushBrokerCheckSumId)
- .append(",\"lastPushBrokerDefaultConfInfo\":\"").append(lastPushBrokerDefaultConfInfo)
- .append("\",\"lastPushBrokerTopicSetConfInfo\":\"")
- .append(lastPushBrokerTopicSetConfInfo.toString())
- .append(",\"reportedBrokerConfId\":").append(reportedBrokerConfId)
- .append(",\"reportedBrokerCheckSumId\":").append(reportedBrokerCheckSumId)
- .append(",\"reportedBrokerDefaultConfInfo\":\"").append(reportedBrokerDefaultConfInfo)
- .append("\",\"reportedBrokerTopicSetConfInfo\":\"")
- .append(reportedBrokerTopicSetConfInfo.toString())
- .append("}");
- } else {
- strBuffer.append(",\"currBrokerConfId\":").append(currBrokerConfId.get())
- .append(",\"currBrokerCheckSumId\":").append(currBrokerCheckSumId);
- strBuffer = getBrokerAndTopicConfJsonInfo(curBrokerDefaultConfInfo,
- "curBrokerDefaultConfInfo",
- curBrokerTopicSetConfInfo,
- "curBrokerTopicSetConfInfo",
- strBuffer);
- strBuffer.append(",\"lastPushBrokerConfId\":").append(lastPushBrokerConfId)
- .append(",\"lastPushBrokerCheckSumId\":").append(lastPushBrokerCheckSumId);
- strBuffer = getBrokerAndTopicConfJsonInfo(lastPushBrokerDefaultConfInfo,
- "lastPushBrokerDefaultConfInfo",
- lastPushBrokerTopicSetConfInfo,
- "lastPushBrokerTopicSetConfInfo",
- strBuffer);
- strBuffer.append(",\"reportedBrokerConfId\":").append(reportedBrokerConfId)
- .append(",\"reportedBrokerCheckSumId\":").append(reportedBrokerCheckSumId);
- strBuffer = getBrokerAndTopicConfJsonInfo(reportedBrokerDefaultConfInfo,
- "reportedBrokerDefaultConfInfo",
- reportedBrokerTopicSetConfInfo,
- "reportedBrokerTopicSetConfInfo",
- strBuffer);
- strBuffer.append("}");
- }
- return strBuffer;
- }
-
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
index ddfca1e..121a82f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
@@ -32,9 +31,13 @@ import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
+
+
public class BrokerTopicInfoView {
- private ConcurrentHashMap<String/* topicName */, TopicInfoView>
- topicConfInfoMap = new ConcurrentHashMap<>();
+ public AtomicLong topicChangeId = new AtomicLong(0);
+ private ConcurrentHashMap<String/* topicName */,
+ ConcurrentHashMap<Integer/* brokerId */, TopicInfo>> topicConfInfoMap =
+ new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashSet<String/* topicName */>>
brokerIdIndexMap = new ConcurrentHashMap<>();
@@ -44,7 +47,7 @@ public class BrokerTopicInfoView {
// remove broker all topic info
public void rmvBrokerTopicInfo(int brokerId) {
- TopicInfoView topicInfoView;
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
// remove pub info
ConcurrentHashSet<String> topicSet =
brokerIdIndexMap.remove(brokerId);
@@ -57,11 +60,12 @@ public class BrokerTopicInfoView {
}
topicInfoView = topicConfInfoMap.get(topic);
if (topicInfoView == null
- || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+ || topicInfoView.isEmpty()) {
continue;
}
- topicInfoView.rmvBrokerTopicInfo(brokerId);
+ topicInfoView.remove(brokerId);
}
+ topicChangeId.set(System.currentTimeMillis());
}
/**
@@ -72,8 +76,7 @@ public class BrokerTopicInfoView {
* if topicInfoMap is null, reserve current configure;
* if topicInfoMap is empty, clear current configure.
*/
- public void updBrokerTopicConfInfo(int brokerId,
- Map<String, TopicInfo> topicInfoMap) {
+ public void updBrokerTopicConfInfo(int brokerId, Map<String, TopicInfo> topicInfoMap) {
if (topicInfoMap == null) {
return;
}
@@ -90,6 +93,7 @@ public class BrokerTopicInfoView {
rmvBrokerTopicInfo(brokerId, delTopicSet);
// add or update TopicInfo
repBrokerTopicInfo(brokerId, topicInfoMap);
+ topicChangeId.set(System.currentTimeMillis());
}
/**
@@ -98,9 +102,9 @@ public class BrokerTopicInfoView {
* @param topicSet need query topic set
*/
public int getMaxTopicBrokerCnt(Set<String> topicSet) {
- int maxCount = -1;
int tmpSize;
- TopicInfoView topicInfoView;
+ int maxCount = -1;
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
if (topicSet == null || topicSet.isEmpty()) {
return maxCount;
}
@@ -110,10 +114,10 @@ public class BrokerTopicInfoView {
}
topicInfoView = topicConfInfoMap.get(topic);
if (topicInfoView == null
- || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+ || topicInfoView.isEmpty()) {
continue;
}
- tmpSize = topicInfoView.curMapSize.get();
+ tmpSize = topicInfoView.size();
if (maxCount < tmpSize) {
maxCount = tmpSize;
}
@@ -122,20 +126,24 @@ public class BrokerTopicInfoView {
}
/**
- * Gets the list of topic partitions whose subscribe status is enabled
+ * Gets the map of topic partitions whose subscribe status is enabled
*
* @param topicSet need query topic set
*/
- public List<Partition> getAcceptSubParts(Set<String> topicSet,
+ public Map<String, Partition> getAcceptSubParts(Set<String> topicSet,
Set<Integer> enableSubBrokerIdSet) {
- List<Partition> partList = new ArrayList<>();
+ Map<String, Partition> partMap = new HashMap<>();
if (topicSet == null || topicSet.isEmpty()) {
- return partList;
+ return partMap;
}
+ List<Partition> tmpPartList;
for (String topic : topicSet) {
- partList.addAll(getAcceptSubParts(topic, enableSubBrokerIdSet));
+ tmpPartList = getAcceptSubParts(topic, enableSubBrokerIdSet);
+ for (Partition partition : tmpPartList) {
+ partMap.put(partition.getPartitionKey(), partition);
+ }
}
- return partList;
+ return partMap;
}
/**
@@ -143,20 +151,20 @@ public class BrokerTopicInfoView {
*
* @param topic need query topic set
*/
- public List<Partition> getAcceptSubParts(String topic,
- Set<Integer> enableSubBrokerIdSet) {
+ public List<Partition> getAcceptSubParts(String topic, Set<Integer> enableSubBrokerIdSet) {
+ Partition tmpPart;
TopicInfo topicInfo;
List<Partition> partList = new ArrayList<>();
if (topic == null) {
return partList;
}
- TopicInfoView topicInfoView = topicConfInfoMap.get(topic);
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView =
+ topicConfInfoMap.get(topic);
if (topicInfoView == null
- || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+ || topicInfoView.isEmpty()) {
return partList;
}
- for (Map.Entry<Integer, TopicInfo> entry
- : topicInfoView.brokerTopicInfoMap.entrySet()) {
+ for (Map.Entry<Integer, TopicInfo> entry : topicInfoView.entrySet()) {
if (entry.getKey() == null
|| entry.getValue() == null
|| !enableSubBrokerIdSet.contains(entry.getKey())) {
@@ -184,21 +192,22 @@ public class BrokerTopicInfoView {
public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet,
Set<Integer> enablePubBrokerIdSet) {
TopicInfo topicInfo;
- TopicInfoView topicInfoView;
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
Map<String, String> topicPartStrMap = new HashMap<>();
- Map<String, StringBuilder> topicPartBufferMap =
- new HashMap<>();
+ Map<String, StringBuilder> topicPartBufferMap = new HashMap<>();
+ if (topicSet == null || topicSet.isEmpty()) {
+ return topicPartStrMap;
+ }
for (String topic : topicSet) {
if (topic == null) {
continue;
}
topicInfoView = topicConfInfoMap.get(topic);
if (topicInfoView == null
- || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+ || topicInfoView.isEmpty()) {
continue;
}
- for (Map.Entry<Integer, TopicInfo> entry
- : topicInfoView.brokerTopicInfoMap.entrySet()) {
+ for (Map.Entry<Integer, TopicInfo> entry : topicInfoView.entrySet()) {
if (entry.getKey() == null
|| entry.getValue() == null
|| !enablePubBrokerIdSet.contains(entry.getKey())) {
@@ -238,11 +247,12 @@ public class BrokerTopicInfoView {
* @return null or topicInfo configure
*/
public TopicInfo getBrokerPushedTopicInfo(int brokerId, String topic) {
- TopicInfoView topicInfoView = topicConfInfoMap.get(topic);
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView =
+ topicConfInfoMap.get(topic);
if (topicInfoView == null) {
return null;
}
- return topicInfoView.brokerTopicInfoMap.get(brokerId);
+ return topicInfoView.get(brokerId);
}
/**
@@ -252,7 +262,7 @@ public class BrokerTopicInfoView {
*/
public List<TopicInfo> getBrokerPushedTopicInfo(int brokerId) {
TopicInfo topicInfo;
- TopicInfoView topicInfoView;
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
List<TopicInfo> topicInfoList = new ArrayList<>();
ConcurrentHashSet<String> topicSet = brokerIdIndexMap.get(brokerId);
if (topicSet == null) {
@@ -264,10 +274,10 @@ public class BrokerTopicInfoView {
}
topicInfoView = topicConfInfoMap.get(topic);
if (topicInfoView == null
- || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+ || topicInfoView.isEmpty()) {
continue;
}
- topicInfo = topicInfoView.brokerTopicInfoMap.get(brokerId);
+ topicInfo = topicInfoView.get(brokerId);
if (topicInfo == null) {
continue;
}
@@ -282,9 +292,8 @@ public class BrokerTopicInfoView {
if (delTopicSet == null || delTopicSet.isEmpty()) {
return;
}
- ConcurrentHashSet<String> topicSet =
- brokerIdIndexMap.get(brokerId);
- TopicInfoView topicInfoView;
+ ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
+ ConcurrentHashSet<String> topicSet = brokerIdIndexMap.get(brokerId);
if (topicSet == null || topicSet.isEmpty()) {
return;
}
@@ -292,10 +301,10 @@ public class BrokerTopicInfoView {
topicSet.remove(topic);
topicInfoView = topicConfInfoMap.get(topic);
if ((topicInfoView == null)
- || topicInfoView.brokerTopicInfoMap.isEmpty()) {
+ || topicInfoView.isEmpty()) {
continue;
}
- topicInfoView.rmvBrokerTopicInfo(brokerId);
+ topicInfoView.remove(brokerId);
}
}
@@ -306,22 +315,22 @@ public class BrokerTopicInfoView {
return;
}
// add topic info
- TopicInfoView newTopicInfoView;
- TopicInfoView curTopicInfoView;
+ ConcurrentHashMap<Integer, TopicInfo> newTopicInfoView;
+ ConcurrentHashMap<Integer, TopicInfo> curTopicInfoView;
for (TopicInfo topicInfo : topicInfoMap.values()) {
if (topicInfo == null) {
continue;
}
curTopicInfoView = topicConfInfoMap.get(topicInfo.getTopic());
if (curTopicInfoView == null) {
- newTopicInfoView = new TopicInfoView();
+ newTopicInfoView = new ConcurrentHashMap<Integer, TopicInfo>();
curTopicInfoView = topicConfInfoMap.putIfAbsent(
topicInfo.getTopic(), newTopicInfoView);
if (curTopicInfoView == null) {
curTopicInfoView = newTopicInfoView;
}
}
- curTopicInfoView.addOrUpdBrokerTopicInfo(brokerId, topicInfo);
+ curTopicInfoView.put(brokerId, topicInfo.clone());
}
// add broker index
ConcurrentHashSet<String> curTopicSet = brokerIdIndexMap.get(brokerId);
@@ -335,38 +344,4 @@ public class BrokerTopicInfoView {
curTopicSet.addAll(topicInfoMap.keySet());
}
- private static class TopicInfoView {
- public AtomicInteger curMapSize = new AtomicInteger(0);
- public AtomicLong topicChangeId = new AtomicLong(0);
- public ConcurrentHashMap<Integer/* brokerId */, TopicInfo> brokerTopicInfoMap =
- new ConcurrentHashMap<Integer/* brokerId */, TopicInfo>();
-
- public TopicInfoView() {
-
- }
-
- public boolean rmvBrokerTopicInfo(int brokerId) {
- TopicInfo topicInfo = brokerTopicInfoMap.remove(brokerId);
- if (topicInfo == null) {
- return false;
- }
- curMapSize.decrementAndGet();
- topicChangeId.set(System.currentTimeMillis());
- return true;
- }
-
- public boolean addOrUpdBrokerTopicInfo(int brokerId, TopicInfo topicInfo) {
- if (topicInfo == null) {
- return false;
- }
- TopicInfo newTopicInfo = topicInfo.clone();
- TopicInfo preTopicInfo =
- brokerTopicInfoMap.put(brokerId, newTopicInfo);
- if (preTopicInfo == null) {
- curMapSize.incrementAndGet();
- }
- topicChangeId.set(System.currentTimeMillis());
- return true;
- }
- }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
new file mode 100644
index 0000000..44f5c36
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.nodemanage.nodebroker;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TErrCodeConstants;
+import org.apache.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.tubemq.corebase.cluster.Partition;
+import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.HeartResponseM2B;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.RegisterResponseM2B;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
+import org.apache.tubemq.corebase.utils.Tuple4;
+import org.apache.tubemq.server.common.heartbeat.HeartbeatManager;
+import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
+import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
+import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.MasterConfig;
+import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.MetaDataManager;
+import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class DefBrokerRunManager implements BrokerRunManager {
+ private static final Logger logger =
+ LoggerFactory.getLogger(DefBrokerRunManager.class);
+ // meta data manager
+ private final MetaDataManager metaDataManager;
+ private final HeartbeatManager heartbeatManager;
+ // broker string info
+ private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
+ private long lastBrokerUpdatedTime = System.currentTimeMillis();
+ private final ConcurrentHashMap<Integer, String> brokersMap =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, String> brokersTLSMap =
+ new ConcurrentHashMap<>();
+
+ // broker sync FSM
+ private AtomicInteger brokerTotalCount = new AtomicInteger(0);
+ private ConcurrentHashMap<Integer/* brokerId */, BrokerRunStatusInfo> brokerRunSyncManageMap =
+ new ConcurrentHashMap<>();
+ // broker abnormal holder
+ private final BrokerAbnHolder brokerAbnHolder;
+ // broker topic configure for consumer and producer
+ private BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
+
+
+ public DefBrokerRunManager(TMaster tMaster) {
+ this.metaDataManager = tMaster.getDefMetaDataManager();
+ this.heartbeatManager = tMaster.getHeartbeatManager();
+ MasterConfig masterConfig = tMaster.getMasterConfig();
+ this.brokerAbnHolder =
+ new BrokerAbnHolder(masterConfig.getMaxAutoForbiddenCnt(),
+ this.metaDataManager);
+ heartbeatManager.regBrokerCheckBusiness(masterConfig.getBrokerHeartbeatTimeoutMs(),
+ new TimeoutListener() {
+ @Override
+ public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) throws Exception {
+ logger.info(new StringBuilder(512).append("[Broker Timeout] ")
+ .append(nodeId).toString());
+ releaseBrokerRunInfo(Integer.parseInt(nodeId), nodeInfo.getSecondKey());
+ }
+ });
+ }
+
+ @Override
+ public Tuple2<Long, Map<Integer, String>> getBrokerStaticInfo(boolean isOverTLS) {
+ if (isOverTLS) {
+ return new Tuple2<>(brokerInfoCheckSum.get(), brokersTLSMap);
+ } else {
+ return new Tuple2<>(brokerInfoCheckSum.get(), brokersMap);
+ }
+ }
+
+ @Override
+ public void updBrokerStaticInfo(Map<Integer, BrokerConfEntity> brokerConfMap) {
+ if (brokerConfMap == null || brokerConfMap.isEmpty()) {
+ return;
+ }
+ for (BrokerConfEntity entity : brokerConfMap.values()) {
+ updBrokerStaticInfo(entity);
+ }
+ }
+
+ @Override
+ public void updBrokerStaticInfo(BrokerConfEntity entity) {
+ if (entity == null) {
+ return;
+ }
+ String brokerReg =
+ this.brokersMap.putIfAbsent(entity.getBrokerId(),
+ entity.getSimpleBrokerInfo());
+ String brokerTLSReg =
+ this.brokersTLSMap.putIfAbsent(entity.getBrokerId(),
+ entity.getSimpleTLSBrokerInfo());
+ if (brokerReg == null
+ || brokerTLSReg == null
+ || !brokerReg.equals(entity.getSimpleBrokerInfo())
+ || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
+ if (brokerReg != null
+ && !brokerReg.equals(entity.getSimpleBrokerInfo())) {
+ this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo());
+ }
+ if (brokerTLSReg != null
+ && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
+ this.brokersTLSMap.put(entity.getBrokerId(), entity.getSimpleTLSBrokerInfo());
+ }
+ this.brokerInfoCheckSum.set(System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ public void delBrokerStaticInfo(int brokerId) {
+ if (brokerId == TBaseConstants.META_VALUE_UNDEFINED) {
+ return;
+ }
+ String brokerReg = this.brokersMap.remove(brokerId);
+ String brokerTLSReg = this.brokersTLSMap.remove(brokerId);
+ if (brokerReg != null || brokerTLSReg != null) {
+ this.brokerInfoCheckSum.set(System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ public Tuple2<Boolean, Boolean> getBrokerPublishStatus(int brokerId) {
+ return brokerPubSubInfo.getBrokerPubStatus(brokerId);
+ }
+
+
+ @Override
+ public BrokerAbnHolder getBrokerAbnHolder() {
+ return this.brokerAbnHolder;
+ }
+
+ @Override
+ public boolean brokerRegister2M(String clientId, BrokerInfo brokerInfo,
+ long reportConfigId, int reportCheckSumId,
+ boolean isTackData, String repBrokerConfInfo,
+ List<String> repTopicConfInfo, boolean isOnline,
+ boolean isOverTLS, StringBuilder sBuffer,
+ ProcessResult result) {
+ BrokerConfEntity brokerEntry =
+ metaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
+ if (brokerEntry == null) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Not found broker configure info, please create first!")
+ .append(" the connecting client id is:")
+ .append(clientId).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if ((!brokerInfo.getHost().equals(brokerEntry.getBrokerIp()))
+ || (brokerInfo.getPort() != brokerEntry.getBrokerPort())) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Inconsistent broker configure,please confirm first!")
+ .append(" the connecting client id is:").append(clientId)
+ .append(", the configure's broker address by brokerId is:")
+ .append(brokerEntry.getBrokerIdAndAddress()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ int confTLSPort = brokerEntry.getBrokerTLSPort();
+ if (confTLSPort != brokerInfo.getTlsPort()) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Inconsistent TLS configure, please confirm first!")
+ .append(" the connecting client id is:").append(clientId)
+ .append(", the configured TLS port is:").append(confTLSPort)
+ .append(", the broker reported TLS port is ")
+ .append(brokerInfo.getTlsPort()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if (brokerEntry.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Broker's configure not online, please online configure first!")
+ .append(" the connecting client id is:").append(clientId).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ String brokerConfInfo =
+ brokerEntry.getBrokerDefaultConfInfo();
+ Map<String, String> topicConfInfoMap =
+ metaDataManager.getBrokerTopicStrConfigInfo(brokerEntry, sBuffer);
+ //
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerInfo.getBrokerId());
+ if (runStatusInfo == null) {
+ BrokerRunStatusInfo tmpRunStatusInfo =
+ new BrokerRunStatusInfo(this, brokerInfo,
+ brokerEntry.getManageStatus(), brokerConfInfo,
+ topicConfInfoMap, isOverTLS);
+ runStatusInfo =
+ brokerRunSyncManageMap.putIfAbsent(
+ brokerInfo.getBrokerId(), tmpRunStatusInfo);
+ if (runStatusInfo == null) {
+ brokerTotalCount.incrementAndGet();
+ runStatusInfo = tmpRunStatusInfo;
+ }
+ } else {
+ runStatusInfo.reInitRunStatusInfo(brokerInfo,
+ brokerEntry.getManageStatus(), brokerConfInfo,
+ topicConfInfoMap, isOverTLS);
+ }
+ runStatusInfo.bookBrokerReportInfo(true, isOnline, reportConfigId,
+ reportCheckSumId, isTackData, repBrokerConfInfo, repTopicConfInfo, sBuffer);
+ heartbeatManager.regBrokerNode(String.valueOf(brokerInfo.getBrokerId()),
+ runStatusInfo.getCreateId());
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean brokerHeartBeat2M(int brokerId, long reportConfigId, int reportCheckSumId,
+ boolean isTackData, String repBrokerConfInfo,
+ List<String> repTopicConfInfo,
+ boolean isTackRmvInfo, List<String> removedTopics,
+ int rptReadStatus, int rptWriteStatus, boolean isOnline,
+ StringBuilder sBuffer, ProcessResult result) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ result.setFailResult(TErrCodeConstants.HB_NO_NODE, sBuffer
+ .append("Not found Broker run status info, please register broker first!")
+ .append(" the connecting client id is:").append(brokerId).toString());
+ return result.isSuccess();
+ }
+ // update heartbeat
+ if (!heartbeatManager.updBrokerNode(String.valueOf(brokerId),
+ runStatusInfo.getCreateId(), sBuffer, result)) {
+ return result.isSuccess();
+ }
+ // update broker status
+ runStatusInfo.bookBrokerReportInfo(false, isOnline, reportConfigId,
+ reportCheckSumId, isTackData, repBrokerConfInfo, repTopicConfInfo, sBuffer);
+ // process removed topic info
+ if (isTackRmvInfo) {
+ metaDataManager.clearRmvedTopicConfInfo(brokerId,
+ removedTopics, sBuffer, result);
+ logger.info(sBuffer.append("[Broker Report] receive broker removed topics = ")
+ .append(removedTopics.toString()).append(", removed result is ")
+ .append(result.getErrInfo()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ brokerAbnHolder.updateBrokerReportStatus(brokerId, rptReadStatus, rptWriteStatus);
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean brokerClose2M(int brokerId, StringBuilder sBuffer, ProcessResult result) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ result.setFailResult(TErrCodeConstants.HB_NO_NODE, sBuffer
+ .append("Not found Broker run status info, please register broker first!")
+ .append(" the connecting client id is:").append(brokerId).toString());
+ return result.isSuccess();
+ }
+ if (!heartbeatManager.unRegBrokerNode(String.valueOf(brokerId),
+ runStatusInfo.getCreateId())) {
+ logger.info(sBuffer.append("[Broker Closed] brokerId=").append(brokerId)
+ .append(" unregister failure, run-info has been replaced by new request!")
+ .toString());
+ return result.isSuccess();
+ }
+ boolean isOverTls = runStatusInfo.isOverTLS();
+ releaseBrokerRunInfo(brokerId, runStatusInfo.getCreateId());
+ logger.info(sBuffer.append("[Broker Closed]").append(brokerId)
+ .append(" unregister success, isOverTLS=").append(isOverTls).toString());
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+
+ @Override
+ public Tuple3<ManageStatus, String, Map<String, String>> getBrokerMetaConfigInfo(int brokerId) {
+ String brokerConfInfo = null;
+ ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
+ StringBuilder sBuffer = new StringBuilder(512);
+ BrokerConfEntity brokerConfEntity =
+ metaDataManager.getBrokerConfByBrokerId(brokerId);
+ if (brokerConfEntity != null) {
+ brokerConfInfo = brokerConfEntity.getBrokerDefaultConfInfo();
+ manageStatus = brokerConfEntity.getManageStatus();
+ }
+ Map<String, String> brokerTopicSetConfInfo =
+ this.metaDataManager.getBrokerTopicStrConfigInfo(brokerConfEntity, sBuffer);
+ return new Tuple3<>(manageStatus, brokerConfInfo, brokerTopicSetConfInfo);
+ }
+
+ @Override
+ public void setRegisterDownConfInfo(int brokerId, StringBuilder sBuffer,
+ RegisterResponseM2B.Builder builder) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ logger.info(sBuffer.append("Get Broker run-info failure, brokerId=")
+ .append(brokerId).append(", please check the implement first!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return;
+ }
+ Tuple4<Long, Integer, String, List<String>> retTuple =
+ runStatusInfo.getNeedSyncData();
+ builder.setCurBrokerConfId(retTuple.getF0());
+ builder.setConfCheckSumId(retTuple.getF1());
+ Tuple2<Boolean, Boolean> autoFbdTuple =
+ brokerAbnHolder.getBrokerAutoFbdStatus(brokerId);
+ builder.setStopWrite(autoFbdTuple.getF0());
+ builder.setStopRead(autoFbdTuple.getF1());
+ if (retTuple.getF2() == null) {
+ builder.setTakeConfInfo(false);
+ } else {
+ builder.setTakeConfInfo(true);
+ builder.setBrokerDefaultConfInfo(retTuple.getF2());
+ builder.addAllBrokerTopicSetConfInfo(retTuple.getF3());
+ logger.info(sBuffer.append("[TMaster sync] push broker configure: brokerId = ")
+ .append(brokerId).append(",configureId=").append(retTuple.getF0())
+ .append(",stopWrite=").append(builder.getStopWrite())
+ .append(",stopRead=").append(builder.getStopRead())
+ .append(",checksumId=").append(retTuple.getF1())
+ .append(",default configure is ").append(retTuple.getF2())
+ .append(",topic configure is ").append(retTuple.getF3()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+
+ @Override
+ public void setHeatBeatDownConfInfo(int brokerId, StringBuilder sBuffer,
+ HeartResponseM2B.Builder builder) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ logger.info(sBuffer.append("Get Broker run-info failure, brokerId=")
+ .append(brokerId).append(", please check the implement first!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return;
+ }
+ Tuple4<Long, Integer, String, List<String>> retTuple =
+ runStatusInfo.getNeedSyncData();
+ builder.setCurBrokerConfId(retTuple.getF0());
+ builder.setConfCheckSumId(retTuple.getF1());
+ Tuple2<Boolean, Boolean> autoFbdTuple =
+ brokerAbnHolder.getBrokerAutoFbdStatus(brokerId);
+ builder.setStopWrite(autoFbdTuple.getF0());
+ builder.setStopRead(autoFbdTuple.getF1());
+ if (retTuple.getF2() == null) {
+ builder.setTakeConfInfo(false);
+ } else {
+ builder.setTakeConfInfo(true);
+ builder.setBrokerDefaultConfInfo(retTuple.getF2());
+ builder.addAllBrokerTopicSetConfInfo(retTuple.getF3());
+ logger.info(sBuffer.append("[TMaster sync] heartbeat sync config: brokerId = ")
+ .append(brokerId).append(",configureId=").append(retTuple.getF0())
+ .append(",stopWrite=").append(builder.getStopWrite())
+ .append(",stopRead=").append(builder.getStopRead())
+ .append(",checksumId=").append(retTuple.getF1())
+ .append(",default configure is ").append(retTuple.getF2())
+ .append(",topic configure is ").append(retTuple.getF3()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+
+ @Override
+ public BrokerRunStatusInfo getBrokerRunStatusInfo(int brokerId) {
+ return this.brokerRunSyncManageMap.get(brokerId);
+ }
+
+ @Override
+ public BrokerInfo getBrokerInfo(int brokerId) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ return null;
+ }
+ return runStatusInfo.getBrokerInfo();
+ }
+
+ @Override
+ public Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> brokerIds) {
+ Map<Integer, BrokerInfo> brokerInfoMap = new HashMap<>();
+ if (brokerIds == null || brokerIds.isEmpty()) {
+ for (BrokerRunStatusInfo runStatusInfo : brokerRunSyncManageMap.values()) {
+ if (runStatusInfo == null) {
+ continue;
+ }
+ BrokerInfo brokerInfo = runStatusInfo.getBrokerInfo();
+ brokerInfoMap.put(brokerInfo.getBrokerId(), brokerInfo);
+ }
+ } else {
+ for (Integer brokerId : brokerIds) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ continue;
+ }
+ brokerInfoMap.put(brokerId, runStatusInfo.getBrokerInfo());
+ }
+ }
+ return brokerInfoMap;
+ }
+
+ @Override
+ public boolean releaseBrokerRunInfo(int brokerId, String blockId) {
+ StringBuilder sBuffer = new StringBuilder(512);
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunSyncManageMap.get(brokerId);
+ if (runStatusInfo == null) {
+ logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
+ .append(" release failure, run-info has deleted before!").toString());
+ return false;
+ }
+ if (!blockId.equals(runStatusInfo.getCreateId())) {
+ logger.info(sBuffer.append("[Broker Release] brokerId=").append(brokerId)
+ .append(" release failure, run-info has been replaced by new register!")
+ .toString());
+ return false;
+ }
+ runStatusInfo = brokerRunSyncManageMap.remove(brokerId);
+ if (runStatusInfo == null) {
+ return false;
+ }
+ brokerTotalCount.decrementAndGet();
+ brokerAbnHolder.removeBroker(brokerId);
+ brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
+ logger.info(sBuffer.append("[Broker Release] brokerId=")
+ .append(brokerId).append(" release success!").toString());
+ return true;
+ }
+
+ @Override
+ public void updBrokerCsmConfInfo(int brokerId, ManageStatus mngStatus,
+ Map<String, TopicInfo> topicInfoMap) {
+ brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
+ brokerPubSubInfo.updBrokerSubTopicConfInfo(brokerId, topicInfoMap);
+
+ }
+
+ @Override
+ public void updBrokerPrdConfInfo(int brokerId, ManageStatus mngStatus,
+ Map<String, TopicInfo> topicInfoMap) {
+ brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
+ brokerPubSubInfo.updBrokerPubTopicConfInfo(brokerId, topicInfoMap);
+ }
+
+ @Override
+ public Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet) {
+ return brokerPubSubInfo.getAcceptPubPartInfo(topicSet);
+ }
+
+ @Override
+ public int getSubTopicMaxBrokerCount(Set<String> topicSet) {
+ return brokerPubSubInfo.getTopicMaxSubBrokerCnt(topicSet);
+ }
+
+ @Override
+ public Map<String, Partition> getSubBrokerAcceptSubParts(Set<String> topicSet) {
+ return brokerPubSubInfo.getAcceptSubParts(topicSet);
+ }
+
+ @Override
+ public List<Partition> getSubBrokerAcceptSubParts(String topic) {
+ return brokerPubSubInfo.getAcceptSubParts(topic);
+ }
+
+ @Override
+ public TopicInfo getPubBrokerTopicInfo(int brokerId, String topic) {
+ return brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic);
+ }
+
+ @Override
+ public List<TopicInfo> getPubBrokerPushedTopicInfo(int brokerId) {
+ return brokerPubSubInfo.getPubBrokerPushedTopicInfo(brokerId);
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 19371a1..95c33d9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -17,17 +17,10 @@
package org.apache.tubemq.server.master.nodemanage.nodebroker;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.corebase.cluster.Partition;
-import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
@@ -39,9 +32,6 @@ public class TopicPSInfoManager {
private final TMaster master;
private final ConcurrentHashMap<String/* topic */,
- ConcurrentHashMap<BrokerInfo, TopicInfo>> brokerPubInfoMap =
- new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String/* topic */,
ConcurrentHashSet<String/* producerId */>> topicPubInfoMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* topic */,
@@ -146,147 +136,7 @@ public class TopicPSInfoManager {
}
}
- public ConcurrentHashMap<BrokerInfo, TopicInfo> getBrokerPubInfo(String topic) {
- return brokerPubInfoMap.get(topic);
- }
-
- public void setBrokerPubInfo(String topic,
- ConcurrentHashMap<BrokerInfo, TopicInfo> brokerPubInfo) {
- brokerPubInfoMap.put(topic, brokerPubInfo);
- }
-
- public int getTopicMaxBrokerCount(Set<String> topicSet) {
- int maxCount = -1;
- if (topicSet == null) {
- return maxCount;
- }
- for (String topicTtem : topicSet) {
- if (topicTtem == null) {
- continue;
- }
- ConcurrentHashMap<BrokerInfo, TopicInfo> tmpBrokerMap =
- brokerPubInfoMap.get(topicTtem);
- if (tmpBrokerMap == null) {
- continue;
- }
- int tmpSize =
- tmpBrokerMap.keySet().size();
- if (maxCount < tmpSize) {
- maxCount = tmpSize;
- }
- }
- return maxCount;
- }
-
- public Set<Partition> getPartitions() {
- Set<Partition> partitions = new HashSet<>();
- for (Map<BrokerInfo, TopicInfo> broker
- : this.brokerPubInfoMap.values()) {
- for (Map.Entry<BrokerInfo, TopicInfo> entry
- : broker.entrySet()) {
- TopicInfo topicInfo = entry.getValue();
- for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
- int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
- for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
- partitions.add(new Partition(entry.getKey(),
- topicInfo.getTopic(), baseValue + i));
- }
- }
- }
- }
- return partitions;
- }
-
- public Set<Partition> getPartitions(Set<String> topics) {
- Set<Partition> partList = new HashSet<>();
- for (String topic : topics) {
- partList.addAll(getPartitionSet(topic));
- }
- return partList;
- }
-
- public Map<String, Partition> getPartitionMap(Set<String> topics) {
- Map<String, Partition> partMap = new HashMap<>();
- for (String topic : topics) {
- ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
- brokerPubInfoMap.get(topic);
- if (topicInfoMap == null) {
- continue;
- }
- for (Map.Entry<BrokerInfo, TopicInfo> entry : topicInfoMap.entrySet()) {
- TopicInfo topicInfo = entry.getValue();
- if (!topicInfo.isAcceptSubscribe()) {
- continue;
- }
- for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
- int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
- for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
- Partition partition =
- new Partition(entry.getKey(), topicInfo.getTopic(), baseValue + i);
- partMap.put(partition.getPartitionKey(), partition);
- }
- }
- }
- }
- return partMap;
- }
-
- public Set<Partition> getPartitionSet(String topic) {
- Set<Partition> partSet = new HashSet<>();
- ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
- brokerPubInfoMap.get(topic);
- if (topicInfoMap == null) {
- return new HashSet<>();
- }
- for (Map.Entry<BrokerInfo, TopicInfo> entry
- : topicInfoMap.entrySet()) {
- TopicInfo topicInfo = entry.getValue();
- if (!topicInfo.isAcceptSubscribe()) {
- continue;
- }
- for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
- int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
- for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
- partSet.add(new Partition(entry.getKey(),
- topicInfo.getTopic(), baseValue + i));
- }
- }
-
- }
- return partSet;
- }
-
- public List<Partition> getPartitionList(String topic) {
- List<Partition> partList = new ArrayList<>(getPartitionSet(topic));
- return partList;
- }
-
- public TopicInfo getTopicInfo(String topic, BrokerInfo broker) {
- ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
- brokerPubInfoMap.get(topic);
- if (topicInfoMap != null) {
- return topicInfoMap.get(broker);
- }
- return null;
- }
-
- public List<TopicInfo> getBrokerPubInfoList(BrokerInfo broker) {
- List<TopicInfo> topicInfoList = new ArrayList<>();
- for (Map.Entry<String, ConcurrentHashMap<BrokerInfo, TopicInfo>> pubEntry
- : brokerPubInfoMap.entrySet()) {
- ConcurrentHashMap<BrokerInfo, TopicInfo> topicPubMap =
- pubEntry.getValue();
- for (Map.Entry<BrokerInfo, TopicInfo> entry : topicPubMap.entrySet()) {
- if (entry.getKey().equals(broker)) {
- topicInfoList.add(entry.getValue());
- }
- }
- }
- return topicInfoList;
- }
-
public void clear() {
- brokerPubInfoMap.clear();
topicPubInfoMap.clear();
topicSubInfoMap.clear();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
index d9d9279..e3f0612 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
@@ -36,6 +36,7 @@ import org.apache.tubemq.corerpc.exception.StandbyException;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerInfoHolder;
import org.apache.tubemq.server.master.web.simplemvc.Action;
@@ -216,16 +217,17 @@ public class Master implements Action {
private void innGetBrokerInfo(final HttpServletRequest req,
StringBuilder sBuilder, boolean isOldRet) {
Map<Integer, BrokerInfo> brokerInfoMap = null;
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
String brokerIds = req.getParameter("ids");
if (TStringUtils.isBlank(brokerIds)) {
- brokerInfoMap = master.getBrokerHolder().getBrokerInfoMap();
+ brokerInfoMap = brokerRunManager.getBrokerInfoMap(null);
} else {
String[] brokerIdArr = brokerIds.split(",");
List<Integer> idList = new ArrayList<>(brokerIdArr.length);
for (String strId : brokerIdArr) {
idList.add(Integer.parseInt(strId));
}
- brokerInfoMap = master.getBrokerHolder().getBrokerInfos(idList);
+ brokerInfoMap = brokerRunManager.getBrokerInfoMap(idList);
}
if (brokerInfoMap != null) {
int index = 1;
@@ -234,8 +236,8 @@ public class Master implements Action {
sBuilder.append("\n################################## ")
.append(index).append(". ").append(broker.toString())
.append(" ##################################\n");
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
- List<TopicInfo> topicInfoList = topicPSInfoManager.getBrokerPubInfoList(broker);
+ List<TopicInfo> topicInfoList =
+ brokerRunManager.getPubBrokerPushedTopicInfo(broker.getBrokerId());
Map<String, TopicDeployEntity> topicConfigMap =
metaDataManager.getBrokerTopicConfEntitySet(broker.getBrokerId());
if (topicConfigMap == null) {
@@ -296,7 +298,7 @@ public class Master implements Action {
*/
private void getUnbalanceGroupInfo(StringBuilder sBuilder) {
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
Map<String, Map<String, Map<String, Partition>>> currentSubInfoMap =
master.getCurrentSubInfoMap();
int currPartSize = 0;
@@ -319,7 +321,8 @@ public class Master implements Action {
}
}
}
- List<Partition> partList = topicPSInfoManager.getPartitionList(topic);
+ List<Partition> partList =
+ brokerRunManager.getSubBrokerAcceptSubParts(topic);
if (currPartSize != partList.size()) {
sBuilder.append(group).append(":").append(topic).append("\n");
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
index 909a888..771aad1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
@@ -28,8 +28,7 @@ import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TubeServerVersion;
import org.apache.tubemq.server.master.TMaster;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.tubemq.server.master.web.common.BrokerQueryResult;
import org.apache.tubemq.server.master.web.model.BrokerVO;
import org.apache.tubemq.server.master.web.simplemvc.Action;
@@ -57,8 +56,9 @@ public class BrokerList implements Action {
int pageSize = TStringUtils.isNotEmpty(strPageSize)
? Integer.parseInt(strPageSize) : 10;
pageSize = pageSize <= 10 ? 10 : pageSize;
- BrokerInfoHolder infoHolder = master.getBrokerHolder();
- List<BrokerInfo> brokerInfoList = new ArrayList(infoHolder.getBrokerInfoMap().values());
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
+ List<BrokerInfo> brokerInfoList =
+ new ArrayList(brokerRunManager.getBrokerInfoMap(null).values());
// *************************************************************************************
for (int i = 0; i < 95; i++) {
BrokerInfo info = new BrokerInfo(i, "192.168.0.1", 8123);
@@ -85,12 +85,12 @@ public class BrokerList implements Action {
+ pageSize;
List<BrokerInfo> firstPageList = brokerInfoList.subList(fromIndex, toIndex);
brokerVOList = new ArrayList<>(brokerInfoList.size());
- TopicPSInfoManager psInfoManager = master.getTopicPSInfoManager();
for (BrokerInfo brokerInfo : firstPageList) {
BrokerVO brokerVO = new BrokerVO();
brokerVO.setId(brokerInfo.getBrokerId());
brokerVO.setIp(brokerInfo.getHost());
- List<TopicInfo> topicInfoList = psInfoManager.getBrokerPubInfoList(brokerInfo);
+ List<TopicInfo> topicInfoList =
+ brokerRunManager.getPubBrokerPushedTopicInfo(brokerInfo.getBrokerId());
brokerVO.setTopicCount(topicInfoList.size());
int totalPartitionNum = 0;
for (TopicInfo topicInfo : topicInfoList) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
index c7fd104..1971caa 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerConfHandler.java
@@ -29,9 +29,9 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.statusdef.StepStatus;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
@@ -42,8 +42,9 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerCon
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerAbnHolder;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunStatusInfo;
/**
@@ -468,8 +469,8 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
return sBuffer;
}
String relReason = (String) result.getRetData();
- BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
- brokerInfoHolder.relAutoForbiddenBrokerInfo(brokerIds, relReason);
+ BrokerAbnHolder abnHolder = master.getBrokerAbnHolder();
+ abnHolder.relAutoForbiddenBrokerInfo(brokerIds, relReason);
WebParameterUtils.buildSuccessResult(sBuffer);
return sBuffer;
}
@@ -534,25 +535,26 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
// query current broker configures
Map<Integer, BrokerConfEntity> brokerConfEntityMap =
metaDataManager.getBrokerConfInfo(brokerIds, brokerIpSet, null);
- BrokerInfoHolder brokerInfoHolder = master.getBrokerHolder();
- Map<Integer, BrokerInfoHolder.BrokerAbnInfo> brokerAbnInfoMap =
- brokerInfoHolder.getBrokerAbnormalMap();
- Map<Integer, BrokerInfoHolder.BrokerFbdInfo> brokerFbdInfoMap =
- brokerInfoHolder.getAutoForbiddenBrokerMapInfo();
+ BrokerAbnHolder abnHolder = master.getBrokerAbnHolder();
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
+ Map<Integer, BrokerAbnHolder.BrokerAbnInfo> brokerAbnInfoMap =
+ abnHolder.getBrokerAbnormalMap();
+ Map<Integer, BrokerAbnHolder.BrokerFbdInfo> brokerFbdInfoMap =
+ abnHolder.getAutoForbiddenBrokerMapInfo();
int totalCnt = 0;
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (BrokerConfEntity entity : brokerConfEntityMap.values()) {
- BrokerInfoHolder.BrokerAbnInfo brokerAbnInfo =
+ BrokerAbnHolder.BrokerAbnInfo brokerAbnInfo =
brokerAbnInfoMap.get(entity.getBrokerId());
if (onlyAbnormal && brokerAbnInfo == null) {
continue;
}
- BrokerInfoHolder.BrokerFbdInfo brokerForbInfo =
+ BrokerAbnHolder.BrokerFbdInfo brokerForbInfo =
brokerFbdInfoMap.get(entity.getBrokerId());
if (onlyAutoForbidden && brokerForbInfo == null) {
continue;
}
- BrokerInfo brokerInfo = brokerInfoHolder.getBrokerInfo(entity.getBrokerId());
+ BrokerInfo brokerInfo = brokerRunManager.getBrokerInfo(entity.getBrokerId());
if (onlyEnableTLS && (brokerInfo == null || !brokerInfo.isEnableTLS())) {
continue;
}
@@ -588,39 +590,41 @@ public class WebBrokerConfHandler extends AbstractWebHandler {
} else {
Tuple2<Boolean, Boolean> pubSubTuple =
entity.getManageStatus().getPubSubStatus();
- BrokerSyncStatusInfo brokerSyncStatusInfo =
- metaDataManager.getBrokerRunSyncStatusInfo(entity.getBrokerId());
- if (brokerSyncStatusInfo == null) {
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunManager.getBrokerRunStatusInfo(entity.getBrokerId());
+ if (runStatusInfo == null) {
sBuffer.append(",\"runStatus\":\"unRegister\",\"subStatus\":\"-\"")
.append(",\"isConfChanged\":\"-\",\"isConfLoaded\":\"-\",\"isBrokerOnline\":\"-\"")
.append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"-\",\"acceptSubscribe\":\"-\"");
} else {
- int stepStatus = brokerSyncStatusInfo.getBrokerRunStatus();
- if (brokerSyncStatusInfo.isBrokerOnline()) {
- if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
+ StepStatus stepStatus = runStatusInfo.getCurStepStatus();
+ if (runStatusInfo.isOnline()) {
+ if (stepStatus == StepStatus.STEP_STATUS_UNDEFINED) {
sBuffer.append(",\"runStatus\":\"running\",\"subStatus\":\"idle\"");
} else {
sBuffer.append(",\"runStatus\":\"running\"")
.append(",\"subStatus\":\"processing_event\",\"stepOp\":")
- .append(stepStatus);
+ .append(stepStatus.getCode());
}
} else {
- if (stepStatus == TStatusConstants.STATUS_SERVICE_UNDEFINED) {
+ if (stepStatus == StepStatus.STEP_STATUS_UNDEFINED) {
sBuffer.append(",\"runStatus\":\"notRegister\",\"subStatus\":\"idle\"");
} else {
sBuffer.append(",\"runStatus\":\"notRegister\"")
.append(",\"subStatus\":\"processing_event\",\"stepOp\":")
- .append(stepStatus);
+ .append(stepStatus.getCode());
}
}
- sBuffer.append(",\"isConfChanged\":\"").append(brokerSyncStatusInfo.isBrokerConfChanged())
- .append("\",\"isConfLoaded\":\"").append(brokerSyncStatusInfo.isBrokerLoaded())
- .append("\",\"isBrokerOnline\":\"").append(brokerSyncStatusInfo.isBrokerOnline())
+ Tuple2<Boolean, Boolean> syncTuple =
+ runStatusInfo.getDataSyncStatus();
+ sBuffer.append(",\"isConfChanged\":\"").append(syncTuple.getF0())
+ .append("\",\"isConfLoaded\":\"").append(syncTuple.getF1())
+ .append("\",\"isBrokerOnline\":\"").append(runStatusInfo.isOnline())
.append("\"").append(",\"brokerVersion\":\"-\",\"acceptPublish\":\"")
.append(pubSubTuple.getF0()).append("\",\"acceptSubscribe\":\"")
.append(pubSubTuple.getF1()).append("\"");
if (withDetail) {
- sBuffer = brokerSyncStatusInfo.toJsonString(sBuffer.append(","), false);
+ sBuffer = runStatusInfo.toJsonString(sBuffer.append(","));
}
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index c8f28ba..c045c4d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -23,7 +23,6 @@ import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.TServerConstants;
@@ -37,7 +36,7 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSe
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -248,7 +247,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
// query topic configure info
Map<String, List<TopicDeployEntity>> topicConfMap =
metaDataManager.getTopicConfMapByTopicAndBrokerIds(topicNameSet, brokerIds);
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
int totalCount = 0;
int brokerCount = 0;
int totalCfgNumPartCount = 0;
@@ -285,11 +284,8 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
isAcceptPublish = pubSubStatus.getF0();
isAcceptSubscribe = pubSubStatus.getF1();
}
- BrokerInfo broker =
- new BrokerInfo(entity.getBrokerId(),
- entity.getBrokerIp(), entity.getBrokerPort());
- TopicInfo topicInfo = topicPSInfoManager.getTopicInfo(
- entity.getTopicName(), broker);
+ TopicInfo topicInfo =
+ brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
if (topicInfo != null) {
if (isAcceptPublish && topicInfo.isAcceptPublish()) {
isSrvAcceptPublish = true;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index aba06c8..5dc45e9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -25,7 +25,6 @@ import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
@@ -43,8 +42,8 @@ import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.GroupCons
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
-import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager;
+import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerRunStatusInfo;
@@ -250,6 +249,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
int dataCount = 0;
int totalStoreNum = 0;
int totalNumPartCount = 0;
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<Integer, List<TopicDeployEntity>> entry : queryResult.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
@@ -260,8 +260,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
if (brokerConf == null) {
continue;
}
- BrokerSyncStatusInfo brokerSyncInfo =
- metaDataManager.getBrokerRunSyncStatusInfo(entry.getKey());
+ BrokerRunStatusInfo runStatusInfo =
+ brokerRunManager.getBrokerRunStatusInfo(entry.getKey());
List<TopicDeployEntity> topicDeployInfo = entry.getValue();
// build return info item
if (dataCount++ > 0) {
@@ -277,22 +277,24 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
brokerConf.getManageStatus().getDescription();
Tuple2<Boolean, Boolean> pubSubStatus =
brokerConf.getManageStatus().getPubSubStatus();
- if (brokerSyncInfo == null) {
+ if (runStatusInfo == null) {
sBuffer.append("\"acceptPublish\":\"-\"")
.append(",\"acceptSubscribe\":\"-\"")
.append(",\"totalPartitionNum\":\"-\"")
.append(",\"totalTopicStoreNum\":\"-\"")
.append(",\"brokerManageStatus\":\"-\"");
} else {
+ Tuple2<Boolean, Boolean> publishTuple =
+ brokerRunManager.getBrokerPublishStatus(entry.getKey());
if (pubSubStatus.getF0()) {
sBuffer.append("\"acceptPublish\":")
- .append(brokerSyncInfo.isAcceptPublish());
+ .append(publishTuple.getF0());
} else {
sBuffer.append("\"acceptPublish\":false");
}
if (pubSubStatus.getF1()) {
sBuffer.append(",\"acceptSubscribe\":")
- .append(brokerSyncInfo.isAcceptSubscribe());
+ .append(publishTuple.getF1());
} else {
sBuffer.append(",\"acceptSubscribe\":false");
}
@@ -494,7 +496,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
boolean isAcceptSubscribe = false;
ManageStatus manageStatus;
Tuple2<Boolean, Boolean> pubSubStatus;
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
ClusterSettingEntity defSetting = metaDataManager.getClusterDefSetting(false);
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
@@ -533,10 +535,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
isAcceptPublish = pubSubStatus.getF0();
isAcceptSubscribe = pubSubStatus.getF1();
}
- BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
- entity.getBrokerIp(), entity.getBrokerPort());
TopicInfo topicInfo =
- topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+ brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
if (topicInfo == null) {
sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
.append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
@@ -634,7 +634,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
boolean isAcceptSubscribe = false;
ManageStatus manageStatus;
Tuple2<Boolean, Boolean> pubSubStatus;
- TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager();
+ BrokerRunManager brokerRunManager = master.getBrokerRunManager();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployMap.entrySet()) {
totalCfgNumPartCount = 0;
@@ -665,10 +665,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
isAcceptPublish = pubSubStatus.getF0();
isAcceptSubscribe = pubSubStatus.getF1();
}
- BrokerInfo broker = new BrokerInfo(entity.getBrokerId(),
- entity.getBrokerIp(), entity.getBrokerPort());
TopicInfo topicInfo =
- topicPSInfoManager.getTopicInfo(entity.getTopicName(), broker);
+ brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
if (topicInfo == null) {
sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
.append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java
index 14f91e6..e76b783 100644
--- a/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java
+++ b/tubemq-server/src/test/java/org/apache/tubemq/server/common/HeartbeatManagerTest.java
@@ -51,7 +51,7 @@ public class HeartbeatManagerTest {
.append(nodeId).toString());
}
});
- heartbeatManager.regBrokerNode("node1");
+ heartbeatManager.regBrokerNode("node1", String.valueOf(System.currentTimeMillis()));
Assert.assertTrue(heartbeatManager.getBrokerRegMap().get("node1").getTimeoutTime() >
System.currentTimeMillis());
}
diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolderTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolderTest.java
deleted file mode 100644
index 7f2c051..0000000
--- a/tubemq-server/src/test/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolderTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.nodemanage.nodebroker;
-
-import static org.mockito.Mockito.mock;
-import org.apache.tubemq.corebase.cluster.BrokerInfo;
-import org.apache.tubemq.server.master.metamanage.MetaDataManager;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BrokerInfoHolderTest {
- private BrokerInfoHolder brokerInfoHolder;
- private MetaDataManager metaDataManager;
-
- @Before
- public void setUp() throws Exception {
- metaDataManager = mock(MetaDataManager.class);
- brokerInfoHolder = new BrokerInfoHolder(10, metaDataManager);
- }
-
- @Test
- public void testBrokerInfo() {
- BrokerInfo brokerInfo = mock(BrokerInfo.class);
-
- brokerInfoHolder.setBrokerInfo(1, brokerInfo);
- Assert.assertEquals(brokerInfo, brokerInfoHolder.getBrokerInfo(1));
-
- brokerInfoHolder.removeBroker(1);
- Assert.assertNull(brokerInfoHolder.getBrokerInfo(1));
- }
-}