You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/11 01:57:56 UTC
[inlong] branch master updated: [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0133149cd [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
0133149cd is described below
commit 0133149cdddab4478be5f45477c066f3d5c72a55
Author: Goson Zhang <46...@qq.com>
AuthorDate: Tue Oct 11 09:57:50 2022 +0800
[INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
---
.../{AllowedSetting.java => MaxMsgSizeHolder.java} | 26 +-
.../tubemq/client/producer/ProducerManager.java | 120 +++----
.../client/producer/SimpleMessageProducer.java | 4 +-
.../tubemq/corebase/utils/DataConverterUtil.java | 60 ++--
.../inlong/tubemq/corebase/utils/Tuple3.java | 9 +
.../inlong/tubemq/corerpc/RemoteConErrStats.java | 4 +-
.../inlong/tubemq/corerpc/client/CallFuture.java | 5 +-
.../corerpc/codec/DataConverterUtilTest.java | 6 +-
.../inlong/tubemq/server/master/TMaster.java | 392 +++++++++++----------
.../nodemanage/nodebroker/BrokerAbnHolder.java | 6 +-
.../nodemanage/nodebroker/BrokerPSInfoHolder.java | 30 +-
.../nodemanage/nodebroker/BrokerRunManager.java | 7 +-
.../nodemanage/nodebroker/BrokerSyncData.java | 1 -
.../nodemanage/nodebroker/BrokerTopicInfoView.java | 44 ++-
.../nodemanage/nodebroker/DefBrokerRunManager.java | 15 +-
.../master/web/handler/WebMasterInfoHandler.java | 37 +-
.../master/web/handler/WebTopicDeployHandler.java | 139 ++++----
17 files changed, 485 insertions(+), 420 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
similarity index 66%
rename from inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
rename to inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
index 22d02dc6b..80c84049b 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
@@ -17,6 +17,8 @@
package org.apache.inlong.tubemq.client.producer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
@@ -24,16 +26,17 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils;
/**
- * The class class caches the dynamic settings
+ * The class caches the max msg size settings
* returned from the server.
*/
-public class AllowedSetting {
- private AtomicLong configId =
+public class MaxMsgSizeHolder {
+ private final AtomicLong configId =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
- private AtomicInteger maxMsgSize =
+ private final AtomicInteger defMaxMsgSize =
new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE);
+ private Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>();
- public AllowedSetting() {
+ public MaxMsgSizeHolder() {
}
@@ -44,18 +47,23 @@ public class AllowedSetting {
configId.set(allowedConfig.getConfigId());
}
if (allowedConfig.hasMaxMsgSize()
- && allowedConfig.getMaxMsgSize() != maxMsgSize.get()) {
- maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
+ && allowedConfig.getMaxMsgSize() != defMaxMsgSize.get()) {
+ defMaxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
allowedConfig.getMaxMsgSize()));
}
}
}
+ public void updTopicMaxSizeInB(Map<String, Integer> topicMaxSizeInBMap) {
+ this.topicMaxSizeInBMap = topicMaxSizeInBMap;
+ }
+
public long getConfigId() {
return configId.get();
}
- public int getMaxMsgSize() {
- return maxMsgSize.get();
+ public int getDefMaxMsgSize(String topicName) {
+ Integer maxMsgSizeInB = topicMaxSizeInBMap.get(topicName);
+ return maxMsgSizeInB == null ? defMaxMsgSize.get() : maxMsgSizeInB;
}
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index afe929f77..6ce1bfa06 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -52,6 +52,7 @@ import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
@@ -81,8 +82,8 @@ public class ProducerManager {
private final ScheduledExecutorService heartbeatService;
private final AtomicLong visitToken =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
- private final AllowedSetting allowedSetting =
- new AllowedSetting();
+ private final MaxMsgSizeHolder msgSizeHolder =
+ new MaxMsgSizeHolder();
private final AtomicReference<String> authAuthorizedTokenRef =
new AtomicReference<>("");
private final ClientAuthenticateHandler authenticateHandler =
@@ -341,10 +342,11 @@ public class ProducerManager {
/**
* Get allowed message size.
*
+ * @param topicName the topic name
* @return max allowed message size
*/
- public int getMaxMsgSize() {
- return allowedSetting.getMaxMsgSize();
+ public int getMaxMsgSize(String topicName) {
+ return msgSizeHolder.getDefMaxMsgSize(topicName);
}
/**
@@ -516,27 +518,32 @@ public class ProducerManager {
return builder.build();
}
- private void updateTopicPartitions(List<TopicInfo> topicInfoList) {
+ private void updateTopicConfigure(
+ Tuple2<Map<String, Integer>, List<TopicInfo>> topicInfoTuple) {
+ int baseValue;
+ Partition tmpPart;
+ List<Partition> partList;
+ Map<Integer, List<Partition>> brokerPartList;
+ // update topic max msg size
+ msgSizeHolder.updTopicMaxSizeInB(topicInfoTuple.getF0());
+ // update topic Partition info
Map<String, Map<Integer, List<Partition>>> partitionListMap =
new ConcurrentHashMap<>();
- for (TopicInfo topicInfo : topicInfoList) {
- Map<Integer, List<Partition>> brokerPartList =
+ for (TopicInfo topicInfo : topicInfoTuple.getF1()) {
+ brokerPartList =
partitionListMap.get(topicInfo.getTopic());
if (brokerPartList == null) {
brokerPartList = new ConcurrentHashMap<>();
partitionListMap.put(topicInfo.getTopic(), brokerPartList);
}
for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) {
- int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
+ baseValue = j * TBaseConstants.META_STORE_INS_BASE;
for (int i = 0; i < topicInfo.getPartitionNum(); i++) {
- Partition part =
- new Partition(topicInfo.getBroker(), topicInfo.getTopic(), baseValue + i);
- List<Partition> partList = brokerPartList.get(part.getBrokerId());
- if (partList == null) {
- partList = new ArrayList<>();
- brokerPartList.put(part.getBrokerId(), partList);
- }
- partList.add(part);
+ tmpPart = new Partition(topicInfo.getBroker(),
+ topicInfo.getTopic(), baseValue + i);
+ partList = brokerPartList.computeIfAbsent(
+ tmpPart.getBrokerId(), k -> new ArrayList<>());
+ partList.add(tmpPart);
}
}
}
@@ -595,20 +602,36 @@ public class ProducerManager {
processAuthorizedToken(response.getAuthorizedInfo());
}
if (response.hasAppdConfig()) {
- procAllowedConfig4P(response.getAppdConfig());
+ msgSizeHolder.updAllowedSetting(response.getAppdConfig());
}
}
- private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response) {
+ private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response,
+ StringBuilder strBuff) {
if (response.hasRequireAuth()) {
nextWithAuthInfo2M.set(response.getRequireAuth());
}
if (response.hasAppdConfig()) {
- procAllowedConfig4P(response.getAppdConfig());
+ msgSizeHolder.updAllowedSetting(response.getAppdConfig());
}
if (response.hasAuthorizedInfo()) {
processAuthorizedToken(response.getAuthorizedInfo());
}
+ if (response.getErrCode() == TErrCodeConstants.NOT_READY) {
+ lastHeartbeatTime = System.currentTimeMillis();
+ return;
+ }
+ if (response.getBrokerCheckSum() != brokerInfoCheckSum) {
+ updateBrokerInfoList(false, response.getBrokerInfosList(),
+ response.getBrokerCheckSum(), strBuff);
+ }
+ if (response.getTopicInfosList().isEmpty()
+ && System.currentTimeMillis() - lastEmptyTopicPrintTime > 60000) {
+ logger.warn("[Heartbeat Update] found empty topicList update!");
+ lastEmptyTopicPrintTime = System.currentTimeMillis();
+ }
+ updateTopicConfigure(DataConverterUtil
+ .convertTopicInfo(brokersMap, response.getTopicInfosList()));
}
private void processAuthorizedToken(ClientMaster.MasterAuthorizedInfo inAuthorizedTokenInfo) {
@@ -654,27 +677,20 @@ public class ProducerManager {
private ClientMaster.ApprovedClientConfig.Builder buildAllowedConfig4P() {
ClientMaster.ApprovedClientConfig.Builder appdConfig =
ClientMaster.ApprovedClientConfig.newBuilder();
- appdConfig.setConfigId(allowedSetting.getConfigId());
+ appdConfig.setConfigId(msgSizeHolder.getConfigId());
return appdConfig;
}
- // set allowed configure info
- private void procAllowedConfig4P(ClientMaster.ApprovedClientConfig allowedConfig) {
- if (allowedConfig != null) {
- allowedSetting.updAllowedSetting(allowedConfig);
- }
- }
-
// #lizard forgives
private class ProducerHeartbeatTask implements Runnable {
@Override
public void run() {
- StringBuilder sBuilder = new StringBuilder(512);
+ StringBuilder strBuff = new StringBuilder(512);
while (!heartBeatStatus.compareAndSet(0, 1)) {
ThreadUtils.sleep(100);
}
// print metrics information
- clientStatsInfo.selfPrintStatsInfo(false, true, sBuilder);
+ clientStatsInfo.selfPrintStatsInfo(false, true, strBuff);
// check whether public topics
if (publishTopics.isEmpty()) {
return;
@@ -689,71 +705,49 @@ public class ProducerManager {
clientStatsInfo.bookHB2MasterException();
logger.error("[Heartbeat Failed] receive null HeartResponseM2P response!");
} else {
- logger.error(sBuilder.append("[Heartbeat Failed] ")
+ logger.error(strBuff.append("[Heartbeat Failed] ")
.append(response.getErrMsg()).toString());
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
clientStatsInfo.bookHB2MasterTimeout();
try {
register2Master();
} catch (Throwable ee) {
- logger.error(sBuilder
+ logger.error(strBuff
.append("[Heartbeat Failed] re-register failure, error is ")
.append(ee.getMessage()).toString());
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
}
} else {
clientStatsInfo.bookHB2MasterException();
if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
- adjustHeartBeatPeriod("certificate failure", sBuilder);
+ adjustHeartBeatPeriod("certificate failure", strBuff);
}
}
}
return;
}
- processHeartBeatSyncInfo(response);
- if (response.getErrCode() == TErrCodeConstants.NOT_READY) {
- lastHeartbeatTime = System.currentTimeMillis();
- return;
- }
- if (response.getBrokerCheckSum() != brokerInfoCheckSum) {
- updateBrokerInfoList(false, response.getBrokerInfosList(),
- response.getBrokerCheckSum(), sBuilder);
- }
- if (response.getTopicInfosList() != null) {
- if (response.getTopicInfosList().isEmpty()
- && System.currentTimeMillis() - lastEmptyTopicPrintTime > 60000) {
- logger.warn("[Heartbeat Update] found empty topicList update!");
- lastEmptyTopicPrintTime = System.currentTimeMillis();
- }
- updateTopicPartitions(DataConverterUtil
- .convertTopicInfo(brokersMap, response.getTopicInfosList()));
- } else {
- logger.error(sBuilder
- .append("[Heartbeat Failed] Found brokerList or topicList is null, brokerList is ")
- .append(response.getBrokerInfosList() != null).toString());
- sBuilder.delete(0, sBuilder.length());
- }
+ processHeartBeatSyncInfo(response, strBuff);
heartbeatRetryTimes = 0;
long currentTime = System.currentTimeMillis();
if ((currentTime - lastHeartbeatTime)
> (tubeClientConfig.getHeartbeatPeriodMs() * 4)) {
- logger.warn(sBuilder.append(producerId)
+ logger.warn(strBuff.append(producerId)
.append(" heartbeat interval is too long, please check! Total time : ")
.append(currentTime - lastHeartbeatTime).toString());
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
}
lastHeartbeatTime = currentTime;
} catch (Throwable e) {
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
if (!(e.getCause() != null
&& e.getCause() instanceof ClientClosedException)) {
logger.error("Heartbeat failed,retry later.Reason:{}",
- sBuilder.append(e.getClass().getSimpleName())
+ strBuff.append(e.getClass().getSimpleName())
.append("#").append(e.getMessage()).toString());
- sBuilder.delete(0, sBuilder.length());
+ strBuff.delete(0, strBuff.length());
}
- adjustHeartBeatPeriod("heartbeat exception", sBuilder);
+ adjustHeartBeatPeriod("heartbeat exception", strBuff);
} finally {
heartBeatStatus.compareAndSet(1, 0);
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index 291c56e71..e049e9804 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -316,11 +316,11 @@ public class SimpleMessageProducer implements MessageProducer {
}
int msgSize = TStringUtils.isBlank(message.getAttribute())
? message.getData().length : (message.getData().length + message.getAttribute().length());
- if (msgSize > producerManager.getMaxMsgSize()) {
+ if (msgSize > producerManager.getMaxMsgSize(message.getTopic())) {
throw new TubeClientException(new StringBuilder(512)
.append("Illegal parameter: over max message length for the total size of")
.append(" message data and attribute, allowed size is ")
- .append(producerManager.getMaxMsgSize())
+ .append(producerManager.getMaxMsgSize(message.getTopic()))
.append(", message's real size is ").append(msgSize).toString());
}
if (isShutDown.get()) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
index 39b2c5b06..3d63c8e19 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java
@@ -96,31 +96,46 @@ public class DataConverterUtil {
/**
* convert string info with a brokerInfo list to @link TopicInfo
*
- * @param brokerInfoMap
+ * @param brokerInfoMap broker configure map
* @param strTopicInfos return a list of TopicInfo
*/
- public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerInfoMap,
- List<String> strTopicInfos) {
+ public static Tuple2<Map<String, Integer>, List<TopicInfo>> convertTopicInfo(
+ Map<Integer, BrokerInfo> brokerInfoMap, List<String> strTopicInfos) {
List<TopicInfo> topicList = new ArrayList<>();
- if (strTopicInfos != null) {
- for (String info : strTopicInfos) {
- if (info != null) {
- info = info.trim();
- String[] strInfo = info.split(TokenConstants.SEGMENT_SEP);
- String[] strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
- for (String s : strTopicInfoSet) {
- String[] strTopicInfo = s.split(TokenConstants.ATTR_SEP);
- BrokerInfo brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0]));
- if (brokerInfo != null) {
- topicList.add(new TopicInfo(brokerInfo,
- strInfo[0], Integer.parseInt(strTopicInfo[1]),
- Integer.parseInt(strTopicInfo[2]), true, true));
- }
- }
+ Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>();
+ if (strTopicInfos == null || strTopicInfos.isEmpty()) {
+ return new Tuple2<>(topicMaxSizeInBMap, topicList);
+ }
+ String[] strInfo;
+ String[] strTopicInfoSet;
+ String[] strTopicInfo;
+ BrokerInfo brokerInfo;
+ for (String info : strTopicInfos) {
+ if (info == null || info.isEmpty()) {
+ continue;
+ }
+ info = info.trim();
+ strInfo = info.split(TokenConstants.SEGMENT_SEP, -1);
+ strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
+ for (String s : strTopicInfoSet) {
+ strTopicInfo = s.split(TokenConstants.ATTR_SEP);
+ brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0]));
+ if (brokerInfo != null) {
+ topicList.add(new TopicInfo(brokerInfo,
+ strInfo[0], Integer.parseInt(strTopicInfo[1]),
+ Integer.parseInt(strTopicInfo[2]), true, true));
}
}
+ if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) {
+ continue;
+ }
+ try {
+ topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2]));
+ } catch (Throwable e) {
+ //
+ }
}
- return topicList;
+ return new Tuple2<>(topicMaxSizeInBMap, topicList);
}
/**
@@ -166,11 +181,8 @@ public class DataConverterUtil {
}
String topicName = strInfo[0].trim();
String[] strCondInfo = strInfo[1].split(TokenConstants.ARRAY_SEP);
- TreeSet<String> conditionSet = topicConditions.get(topicName);
- if (conditionSet == null) {
- conditionSet = new TreeSet<>();
- topicConditions.put(topicName, conditionSet);
- }
+ TreeSet<String> conditionSet =
+ topicConditions.computeIfAbsent(topicName, k -> new TreeSet<>());
for (String cond : strCondInfo) {
if (TStringUtils.isNotBlank(cond)) {
conditionSet.add(cond.trim());
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java
index 74e628625..2e857f4b6 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java
@@ -56,6 +56,15 @@ public class Tuple3<T0, T1, T2> {
return f2;
}
+ public void setF0AndF1(T0 value0, T1 value1) {
+ this.f0 = value0;
+ this.f1 = value1;
+ }
+
+ public void setF2(T2 value2) {
+ this.f2 = value2;
+ }
+
public void setFieldsValue(T0 value0, T1 value1, T2 value2) {
this.f0 = value0;
this.f1 = value1;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java
index deb16d692..35728d5c9 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java
@@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class RemoteConErrStats {
private long statisticDuration = 60000;
private int maxConnAllowedFailCount = 5;
- private AtomicLong errCounter = new AtomicLong(0);
- private AtomicLong lastTimeStamp = new AtomicLong(0);
+ private final AtomicLong errCounter = new AtomicLong(0);
+ private final AtomicLong lastTimeStamp = new AtomicLong(0);
public RemoteConErrStats(final long statisticDuration,
final int maxConnAllowedFailCount) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java
index 492017e7e..77837a28d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
* A Future implementation for RPCs.
*/
public class CallFuture<T> implements Future<T>, Callback<T> {
+ private static final String errMsgInfo = "Wait response message timeout!";
private final CountDownLatch latch = new CountDownLatch(1);
private final Callback<T> chainedCallback;
private T result = null;
@@ -131,7 +132,7 @@ public class CallFuture<T> implements Future<T>, Callback<T> {
}
return result;
} else {
- throw new TimeoutException();
+ throw new TimeoutException(errMsgInfo);
}
}
@@ -155,7 +156,7 @@ public class CallFuture<T> implements Future<T>, Callback<T> {
public void await(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (!latch.await(timeout, unit)) {
- throw new TimeoutException();
+ throw new TimeoutException(errMsgInfo);
}
}
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
index bc5e26d93..f7ef9576e 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.junit.Test;
public class DataConverterUtilTest {
@@ -65,8 +66,9 @@ public class DataConverterUtilTest {
strInfoList.clear();
// topic#brokerId:partitionNum:topicStoreNum
strInfoList.add("tube#0:10:5");
- List<TopicInfo> topicList = DataConverterUtil.convertTopicInfo(brokerMap, strInfoList);
- assertEquals("topic should be equal", topic, topicList.get(0));
+ Tuple2<Map<String, Integer>, List<TopicInfo>> topicConfTuple =
+ DataConverterUtil.convertTopicInfo(brokerMap, strInfoList);
+ assertEquals("topic should be equal", topic, topicConfTuple.getF1().get(0));
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index c18c54b22..2b802de98 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -82,6 +82,7 @@ import org.apache.inlong.tubemq.corebase.utils.OpsSyncInfo;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
@@ -352,13 +353,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
heartbeatManager.regProducerNode(producerId);
producerHolder.setProducerInfo(producerId,
new HashSet<>(transTopicSet), hostName, overtls);
+ // get current configure information
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
brokerRunManager.getBrokerStaticInfo(overtls);
+ Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple =
+ getTopicConfigureInfos(producerId, true);
builder.setBrokerCheckSum(brokerStaticInfo.getF0());
builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
- builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
+ builder.setAuthorizedInfo(genAuthorizedInfo(
+ certResult.authorizedToken, false).build());
ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
- buildApprovedClientConfig(request.getAppdConfig());
+ buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple);
if (clientConfigBuilder != null) {
builder.setAppdConfig(clientConfigBuilder);
}
@@ -443,24 +448,30 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
topicPSInfoManager.addProducerTopicPubInfo(producerId, transTopicSet);
producerHolder.updateProducerInfo(producerId,
transTopicSet, hostName, overtls);
- Map<String, String> availTopicPartitions = getProducerTopicPartitionInfo(producerId);
- builder.addAllTopicInfos(availTopicPartitions.values());
- builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
+ // get current configure information and set
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
brokerRunManager.getBrokerStaticInfo(overtls);
+ final Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple =
+ getTopicConfigureInfos(producerId, false);
+ builder.setAuthorizedInfo(genAuthorizedInfo(
+ certResult.authorizedToken, false).build());
builder.setBrokerCheckSum(brokerStaticInfo.getF0());
if (brokerStaticInfo.getF0() != inBrokerCheckSum) {
builder.addAllBrokerInfos(brokerStaticInfo.getF1().values());
}
+ if (prodTopicConfigTuple.getF2() != null) {
+ builder.addAllTopicInfos(prodTopicConfigTuple.getF2().values());
+ }
ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
- buildApprovedClientConfig(request.getAppdConfig());
+ buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple);
if (clientConfigBuilder != null) {
builder.setAppdConfig(clientConfigBuilder);
}
if (logger.isDebugEnabled()) {
logger.debug(strBuffer.append("[Push Producer's available topic count:]")
.append(producerId).append(TokenConstants.LOG_SEG_SEP)
- .append(availTopicPartitions.size()).toString());
+ .append((prodTopicConfigTuple.getF2() == null)
+ ? 0 : prodTopicConfigTuple.getF2().size()).toString());
}
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -564,6 +575,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
final Set<String> reqTopicSet = (Set<String>) result.getRetData();
+ final Map<String, TreeSet<String>> reqTopicConditions =
+ DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL;
@@ -576,8 +589,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
Map<String, Long> requiredPartMap = (Map<String, Long>) paramCheckResult.checkData;
- Map<String, TreeSet<String>> reqTopicConditions =
- DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
String sessionKey = request.hasSessionKey() ? request.getSessionKey() : "";
long sessionTime = request.hasSessionTime()
? request.getSessionTime() : System.currentTimeMillis();
@@ -642,20 +653,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData;
topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
if (CollectionUtils.isNotEmpty(subscribeList)) {
- Map<String, Map<String, Partition>> topicPartSubMap =
- new HashMap<>();
+ int reportCnt = 0;
+ Map<String, Partition> partMap;
+ Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
currentSubInfo.put(consumerId, topicPartSubMap);
+ strBuffer.append("[SubInfo Report] client=").append(consumerId)
+ .append(", subscribed partitions=[");
for (SubscribeInfo info : subscribeList) {
- Map<String, Partition> partMap = topicPartSubMap.get(info.getTopic());
- if (partMap == null) {
- partMap = new HashMap<>();
- topicPartSubMap.put(info.getTopic(), partMap);
- }
+ partMap = topicPartSubMap.computeIfAbsent(
+ info.getTopic(), k -> new HashMap<>());
partMap.put(info.getPartition().getPartitionKey(), info.getPartition());
- logger.info(strBuffer.append("[SubInfo Report]")
- .append(info.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
+ if (reportCnt++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append(info.getPartitionStr());
}
+ strBuffer.append("]");
+ logger.info(strBuffer.toString());
+ strBuffer.delete(0, strBuffer.length());
}
heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
} catch (IOException e) {
@@ -1601,19 +1616,29 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @param producerId
* @return
*/
- private Map<String, String> getProducerTopicPartitionInfo(String producerId) {
- ProducerInfo producerInfo =
- producerHolder.getProducerInfo(producerId);
+ private Tuple3<Long, Integer, Map<String, String>> getTopicConfigureInfos(String producerId,
+ boolean onlyMsgSize) {
+ Tuple3<Long, Integer, Map<String, String>> result = new Tuple3<>();
+ ClusterSettingEntity clusterEntity =
+ defMetaDataService.getClusterDefSetting(false);
+ result.setF0AndF1(clusterEntity.getSerialId(),
+ clusterEntity.getMaxMsgSizeInB());
+ if (onlyMsgSize) {
+ return result;
+ }
+ ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId);
if (producerInfo == null) {
- return new HashMap<>();
+ return result;
}
- Set<String> producerInfoTopicSet =
- producerInfo.getTopicSet();
- if ((producerInfoTopicSet == null)
- || (producerInfoTopicSet.isEmpty())) {
- return new HashMap<>();
+ Set<String> publishedTopicSet = producerInfo.getTopicSet();
+ if ((publishedTopicSet == null)
+ || (publishedTopicSet.isEmpty())) {
+ return result;
}
- return brokerRunManager.getPubBrokerAcceptPubPartInfo(producerInfoTopicSet);
+ Map<String, Integer> topicAndSizeMap =
+ defMetaDataService.getMaxMsgSizeInBByTopics(result.getF1(), publishedTopicSet);
+ result.setF2(brokerRunManager.getPubBrokerAcceptPubPartInfo(topicAndSizeMap));
+ return result;
}
@Override
@@ -1642,12 +1667,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
final StringBuilder strBuffer = new StringBuilder(512);
final long balanceId = idGenerator.incrementAndGet();
if (defMetaDataService != null) {
- logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
+ logger.info(strBuffer.append("[Balance Status] ").append(balanceId)
.append(", isMaster=").append(defMetaDataService.isSelfMaster())
.append(", isPrimaryNodeActive=")
.append(defMetaDataService.isPrimaryNodeActive()).toString());
} else {
- logger.info(strBuffer.append("[Balance Start] ").append(balanceId)
+ logger.info(strBuffer.append("[Balance Status] ").append(balanceId)
.append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString());
}
strBuffer.delete(0, strBuffer.length());
@@ -1655,7 +1680,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
processClientBalanceMetaInfo(balanceId, strBuffer);
// process server-balance
processServerBalance(tMaster, balanceId, strBuffer);
- logger.info(strBuffer.append("[Balance End] ").append(balanceId).toString());
}
private void processServerBalance(TMaster tMaster,
@@ -1663,7 +1687,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
StringBuilder sBuffer) {
int curDoingTasks = this.curSvrBalanceParal.get();
if (curDoingTasks > 0) {
- logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId)
+ logger.info(sBuffer.append("[Svr-Balance Status] ").append(balanceId)
.append(" the Server-Balance has ").append(curDoingTasks)
.append(" task(s) in progress!").toString());
sBuffer.delete(0, sBuffer.length());
@@ -1704,13 +1728,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (subGroups.isEmpty()) {
return;
}
+ final StringBuilder strBuffer = new StringBuilder(512);
// first process reset rebalance task;
try {
tMaster.processResetbalance(balanceId,
- isStartBalance, subGroups);
+ isStartBalance, subGroups, strBuffer);
} catch (Throwable e) {
logger.warn(new StringBuilder(1024)
- .append("[Svr-Balance processor] Error during reset-reb,")
+ .append("[Svr-Balance Status] Error during reset-reb,")
.append("the groups that may be affected are ")
.append(subGroups).append(",error is ")
.append(e).toString());
@@ -1721,16 +1746,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// second process normal balance task;
try {
tMaster.processRebalance(balanceId,
- isStartBalance, subGroups);
+ isStartBalance, subGroups, strBuffer);
} catch (Throwable e) {
logger.warn(new StringBuilder(1024)
- .append("[Svr-Balance processor] Error during normal-reb,")
+ .append("[Svr-Balance Status] Error during normal-reb,")
.append("the groups that may be affected are ")
.append(subGroups).append(",error is ")
.append(e).toString());
}
} catch (Throwable e) {
- logger.warn("[Svr-Balance processor] Error during process", e);
+ logger.warn("[Svr-Balance Status] Error during process", e);
} finally {
if (curSvrBalanceParal.decrementAndGet() == 0) {
MasterSrvStatsHolder.updSvrBalanceDurations(
@@ -1742,14 +1767,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
startupBalance = false;
- logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId).toString());
- sBuffer.delete(0, sBuffer.length());
}
private void processClientBalanceMetaInfo(long balanceId, StringBuilder sBuffer) {
int curDoingTasks = this.curCltBalanceParal.get();
if (curDoingTasks > 0) {
- logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId)
+ logger.info(sBuffer.append("[Clt-Balance Status] ").append(balanceId)
.append(" the Client-Balance has ").append(curDoingTasks)
.append(" task(s) in progress!").toString());
sBuffer.delete(0, sBuffer.length());
@@ -1794,7 +1817,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
freshTopicMetaInfo(consumeGroupInfo, sBuffer2);
}
} catch (Throwable e) {
- logger.warn("[Clt-Balance processor] Error during process", e);
+ logger.warn("[Clt-Balance Status] Error during process", e);
} finally {
curCltBalanceParal.decrementAndGet();
}
@@ -1802,8 +1825,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
});
}
}
- logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId).toString());
- sBuffer.delete(0, sBuffer.length());
}
public void freshTopicMetaInfo(ConsumeGroupInfo consumeGroupInfo, StringBuilder sBuffer) {
@@ -1826,7 +1847,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
int count = 0;
int statusId = 0;
- TopicInfo topicInfo;
+ Tuple2<Boolean, TopicInfo> topicSubInfo = new Tuple2<>();
Set<String> fbdTopicSet =
defMetaDataService.getDisableTopicByGroupName(groupInfo.getGroupName());
for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) {
@@ -1834,14 +1855,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
continue;
}
count = 0;
- statusId = 0;
for (TopicDeployEntity deployInfo : entry.getValue()) {
- topicInfo =
- brokerRunManager.getPubBrokerTopicInfo(
- deployInfo.getBrokerId(), deployInfo.getTopicName());
- if (topicInfo != null
- && topicInfo.isAcceptSubscribe()
- && !fbdTopicSet.contains(topicInfo.getTopic())) {
+ statusId = 0;
+ brokerRunManager.getSubBrokerTopicInfo(deployInfo.getBrokerId(),
+ deployInfo.getTopicName(), topicSubInfo);
+ if (topicSubInfo.getF0()
+ && topicSubInfo.getF1() != null
+ && topicSubInfo.getF1().isAcceptSubscribe()
+ && !fbdTopicSet.contains(deployInfo.getTopicName())) {
statusId = 1;
}
if (count++ == 0) {
@@ -1868,11 +1889,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @param rebalanceId the re-balance id
* @param isFirstReb whether is first re-balance
* @param groups the need re-balance group set
+ * @param strBuffer string buffer
*/
- public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
+ public void processRebalance(long rebalanceId, boolean isFirstReb,
+ List<String> groups, StringBuilder strBuffer) {
// #lizard forgives
- Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
- final StringBuilder strBuffer = new StringBuilder(512);
+ Map<String, Map<String, List<Partition>>> finalSubInfoMap;
// choose different load balance strategy
if (isFirstReb) {
finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder,
@@ -1881,122 +1903,91 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo,
consumerHolder, brokerRunManager, groups, defMetaDataService, strBuffer);
}
+ boolean included;
+ String consumerId;
+ boolean isDelEmpty;
+ boolean isAddEmtpy;
+ ConsumerInfo consumerInfo;
+ Set<String> blackTopicSet;
+ List<SubscribeInfo> deletedSubInfoList;
+ List<SubscribeInfo> addedSubInfoList;
+ Map<String, Partition> currentPartMap;
+ Map<String, Map<String, Partition>> curTopicSubInfoMap;
// allocate partitions to consumers
for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) {
if (entry == null) {
continue;
}
- String consumerId = entry.getKey();
+ consumerId = entry.getKey();
if (consumerId == null) {
continue;
}
- ConsumerInfo consumerInfo =
- consumerHolder.getConsumerInfo(consumerId);
+ consumerInfo = consumerHolder.getConsumerInfo(consumerId);
if (consumerInfo == null) {
continue;
}
- Set<String> blackTopicSet =
+ addedSubInfoList = new ArrayList<>();
+ deletedSubInfoList = new ArrayList<>();
+ blackTopicSet =
defMetaDataService.getDisableTopicByGroupName(consumerInfo.getGroupName());
- Map<String, List<Partition>> topicSubPartMap = entry.getValue();
- List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
- List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
- for (Map.Entry<String, List<Partition>> topicEntry : topicSubPartMap.entrySet()) {
+ for (Map.Entry<String, List<Partition>> topicEntry : entry.getValue().entrySet()) {
if (topicEntry == null) {
continue;
}
- String topic = topicEntry.getKey();
- List<Partition> finalPartList = topicEntry.getValue();
- Map<String, Partition> currentPartMap = null;
- Map<String, Map<String, Partition>> curTopicSubInfoMap =
- currentSubInfo.get(consumerId);
- if (curTopicSubInfoMap == null || curTopicSubInfoMap.get(topic) == null) {
+ curTopicSubInfoMap = currentSubInfo.get(consumerId);
+ if (curTopicSubInfoMap == null
+ || curTopicSubInfoMap.get(topicEntry.getKey()) == null) {
currentPartMap = new HashMap<>();
} else {
- currentPartMap = curTopicSubInfoMap.get(topic);
+ currentPartMap = curTopicSubInfoMap.get(topicEntry.getKey());
if (currentPartMap == null) {
currentPartMap = new HashMap<>();
}
}
- if (consumerInfo.isOverTLS()) {
- for (Partition currentPart : currentPartMap.values()) {
- if (!blackTopicSet.contains(currentPart.getTopic())) {
- boolean found = false;
- for (Partition newPart : finalPartList) {
- if (newPart.getPartitionFullStr(true)
- .equals(currentPart.getPartitionFullStr(true))) {
- found = true;
- break;
- }
- }
- if (found) {
+ for (Partition currentPart : currentPartMap.values()) {
+ included = false;
+ if (!blackTopicSet.contains(currentPart.getTopic())) {
+ for (Partition newPart : topicEntry.getValue()) {
+ if (newPart == null) {
continue;
}
- }
- deletedSubInfoList
- .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(),
- consumerInfo.isOverTLS(), currentPart));
- }
- for (Partition finalPart : finalPartList) {
- if (!blackTopicSet.contains(finalPart.getTopic())) {
- boolean found = false;
- for (Partition curPart : currentPartMap.values()) {
- if (finalPart.getPartitionFullStr(true)
- .equals(curPart.getPartitionFullStr(true))) {
- found = true;
- break;
- }
- }
- if (found) {
- continue;
+ if (newPart.getPartitionKey().equals(
+ currentPart.getPartitionKey())) {
+ included = true;
+ break;
}
- addedSubInfoList.add(new SubscribeInfo(consumerId,
- consumerInfo.getGroupName(), true, finalPart));
}
}
- } else {
- for (Partition currentPart : currentPartMap.values()) {
- if ((blackTopicSet.contains(currentPart.getTopic()))
- || (!finalPartList.contains(currentPart))) {
- deletedSubInfoList.add(new SubscribeInfo(consumerId,
- consumerInfo.getGroupName(), false, currentPart));
- }
+ if (!included) {
+ deletedSubInfoList.add(new SubscribeInfo(consumerId,
+ consumerInfo.getGroupName(), consumerInfo.isOverTLS(), currentPart));
}
- for (Partition finalPart : finalPartList) {
- if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
- && (!blackTopicSet.contains(finalPart.getTopic()))) {
- addedSubInfoList.add(new SubscribeInfo(consumerId,
- consumerInfo.getGroupName(), false, finalPart));
- }
+ }
+ for (Partition finalPart : topicEntry.getValue()) {
+ if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
+ && (!blackTopicSet.contains(finalPart.getTopic()))) {
+ addedSubInfoList.add(new SubscribeInfo(consumerId,
+ consumerInfo.getGroupName(), consumerInfo.isOverTLS(), finalPart));
}
}
}
- boolean isDelEmpty = deletedSubInfoList.isEmpty();
- boolean isAddEmtpy = addedSubInfoList.isEmpty();
+ isDelEmpty = deletedSubInfoList.isEmpty();
+ isAddEmtpy = addedSubInfoList.isEmpty();
if (!isDelEmpty) {
- EventType opType =
- (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT;
- consumerEventManager
- .addDisconnectEvent(consumerId,
- new ConsumerEvent(rebalanceId, opType,
- deletedSubInfoList, EventStatus.TODO));
- for (SubscribeInfo info : deletedSubInfoList) {
- logger.info(strBuffer.append("[Disconnect]")
- .append(info.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- }
+ consumerEventManager.addDisconnectEvent(consumerId,
+ new ConsumerEvent(rebalanceId,
+ (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT,
+ deletedSubInfoList, EventStatus.TODO));
+ printTODOContent(rebalanceId, consumerId,
+ "Disconnect", deletedSubInfoList, strBuffer);
}
if (!isAddEmtpy) {
- EventType opType =
- (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT;
- consumerEventManager
- .addConnectEvent(consumerId,
- new ConsumerEvent(rebalanceId, opType,
- addedSubInfoList, EventStatus.TODO));
- for (SubscribeInfo info : addedSubInfoList) {
- logger.info(strBuffer.append("[Connect]")
- .append(info.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- }
+ consumerEventManager.addConnectEvent(consumerId,
+ new ConsumerEvent(rebalanceId,
+ (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT,
+ addedSubInfoList, EventStatus.TODO));
+ printTODOContent(rebalanceId, consumerId,
+ "Connect", addedSubInfoList, strBuffer);
}
}
}
@@ -2004,10 +1995,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
/**
* process Reset balance
*/
- public void processResetbalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
+ public void processResetbalance(long rebalanceId, boolean isFirstReb,
+ List<String> groups, StringBuilder strBuffer) {
// #lizard forgives
- final StringBuilder strBuffer = new StringBuilder(512);
- Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap = null;
+ Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap;
// choose different load balance strategy
if (isFirstReb) {
finalSubInfoMap = this.loadBalancer.resetBukAssign(consumerHolder,
@@ -2016,46 +2007,51 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo,
consumerHolder, brokerRunManager, groups, this.defMetaDataService, strBuffer);
}
+ String consumerId;
+ boolean isAddEmtpy;
+ boolean isDelEmpty;
+ ConsumerInfo consumerInfo;
+ Set<String> blackTopicSet;
+ List<SubscribeInfo> addedSubInfoList;
+ List<SubscribeInfo> deletedSubInfoList;
+ Map<String, Partition> finalPartMap;
+ Map<String, Partition> currentPartMap;
+ Map<String, Map<String, Partition>> curTopicSubInfoMap;
// filter
for (Map.Entry<String, Map<String, Map<String, Partition>>> entry
: finalSubInfoMap.entrySet()) {
if (entry == null) {
continue;
}
- String consumerId = entry.getKey();
+ consumerId = entry.getKey();
if (consumerId == null) {
continue;
}
- ConsumerInfo consumerInfo =
- consumerHolder.getConsumerInfo(consumerId);
+ consumerInfo = consumerHolder.getConsumerInfo(consumerId);
if (consumerInfo == null) {
continue;
}
// allocate partitions to consumers
- Set<String> blackTopicSet =
+ addedSubInfoList = new ArrayList<>();
+ deletedSubInfoList = new ArrayList<>();
+ blackTopicSet =
defMetaDataService.getDisableTopicByGroupName(consumerInfo.getGroupName());
- Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
- List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
- List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
- for (Map.Entry<String, Map<String, Partition>> topicEntry : topicSubPartMap.entrySet()) {
+ for (Map.Entry<String, Map<String, Partition>> topicEntry : entry.getValue().entrySet()) {
if (topicEntry == null) {
continue;
}
- String topic = topicEntry.getKey();
- Map<String, Partition> finalPartMap = topicEntry.getValue();
- Map<String, Partition> currentPartMap = null;
- Map<String, Map<String, Partition>> curTopicSubInfoMap =
- currentSubInfo.get(consumerId);
+ curTopicSubInfoMap = currentSubInfo.get(consumerId);
if (curTopicSubInfoMap == null
- || curTopicSubInfoMap.get(topic) == null) {
+ || curTopicSubInfoMap.get(topicEntry.getKey()) == null) {
currentPartMap = new HashMap<>();
} else {
- currentPartMap = curTopicSubInfoMap.get(topic);
+ currentPartMap = curTopicSubInfoMap.get(topicEntry.getKey());
if (currentPartMap == null) {
currentPartMap = new HashMap<>();
}
}
// filter
+ finalPartMap = topicEntry.getValue();
for (Partition currentPart : currentPartMap.values()) {
if ((blackTopicSet.contains(currentPart.getTopic()))
|| (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
@@ -2073,35 +2069,55 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
// generate consumer event
- boolean isDelEmpty = deletedSubInfoList.isEmpty();
- boolean isAddEmtpy = addedSubInfoList.isEmpty();
+ isDelEmpty = deletedSubInfoList.isEmpty();
+ isAddEmtpy = addedSubInfoList.isEmpty();
if (!isDelEmpty) {
- EventType opType =
- (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT;
consumerEventManager.addDisconnectEvent(consumerId,
- new ConsumerEvent(rebalanceId, opType,
+ new ConsumerEvent(rebalanceId,
+ (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT,
deletedSubInfoList, EventStatus.TODO));
- for (SubscribeInfo info : deletedSubInfoList) {
- logger.info(strBuffer.append("[ResetDisconnect]")
- .append(info.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- }
+ printTODOContent(rebalanceId, consumerId,
+ "ResetDisconnect", deletedSubInfoList, strBuffer);
}
if (!isAddEmtpy) {
- EventType opType =
- (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT;
consumerEventManager.addConnectEvent(consumerId,
- new ConsumerEvent(rebalanceId, opType,
+ new ConsumerEvent(rebalanceId,
+ (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT,
addedSubInfoList, EventStatus.TODO));
- for (SubscribeInfo info : addedSubInfoList) {
- logger.info(strBuffer.append("[ResetConnect]")
- .append(info.toString()).toString());
- strBuffer.delete(0, strBuffer.length());
- }
+ printTODOContent(rebalanceId, consumerId,
+ "ResetConnect", addedSubInfoList, strBuffer);
}
}
}
+ /**
+ * Print the TODO subscribe info
+ *
+ * @param rebalanceId the rebalance id
+ * @param consumerId the consumer id
+ * @param opType the operation type
+ * @param subInfoList the subscribe set
+ * @param strBuffer the string buffer
+ */
+ private void printTODOContent(long rebalanceId, String consumerId, String opType,
+ List<SubscribeInfo> subInfoList, StringBuilder strBuffer) {
+ int recordCnt = 0;
+ strBuffer.append("[").append(opType).append("] TODO, rebalanceId=").append(rebalanceId)
+ .append(", client=").append(consumerId).append(", partitions=[");
+ for (SubscribeInfo info : subInfoList) {
+ if (info == null) {
+ continue;
+ }
+ if (recordCnt++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append(info.getPartitionStr());
+ }
+ strBuffer.append("]");
+ logger.info(strBuffer.toString());
+ strBuffer.delete(0, strBuffer.length());
+ }
+
/**
* check if master subscribe info consist consumer subscribe info
*
@@ -2177,6 +2193,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
List<String> groupsNeedToBalance = new ArrayList<>();
Set<String> groupHasUnfinishedEvent = new HashSet<>();
if (consumerEventManager.hasEvent()) {
+ String group;
Set<String> consumerIdSet =
consumerEventManager.getUnProcessedIdSet();
Map<String, TimeoutInfo> heartbeatMap =
@@ -2185,7 +2202,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (consumerId == null) {
continue;
}
- String group = consumerHolder.getGroupName(consumerId);
+ group = consumerHolder.getGroupName(consumerId);
if (group == null) {
continue;
}
@@ -2238,23 +2255,20 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* build approved client configure
*
* @param inClientConfig client reported Configure info
+ * @param prodConfigTuple published max topic size tuple
* @return ApprovedClientConfig
*/
private ClientMaster.ApprovedClientConfig.Builder buildApprovedClientConfig(
- ClientMaster.ApprovedClientConfig inClientConfig) {
+ ClientMaster.ApprovedClientConfig inClientConfig,
+ Tuple3<Long, Integer, Map<String, String>> prodConfigTuple) {
ClientMaster.ApprovedClientConfig.Builder outClientConfig = null;
- if (inClientConfig != null) {
- outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder();
- ClusterSettingEntity settingEntity =
- this.defMetaDataService.getClusterDefSetting(false);
- if (settingEntity == null) {
- outClientConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED);
- } else {
- outClientConfig.setConfigId(settingEntity.getSerialId());
- if (settingEntity.getSerialId() != inClientConfig.getConfigId()) {
- outClientConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB());
- }
- }
+ if (inClientConfig == null) {
+ return outClientConfig;
+ }
+ outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder();
+ outClientConfig.setConfigId(prodConfigTuple.getF0());
+ if (inClientConfig.getConfigId() != prodConfigTuple.getF0()) {
+ outClientConfig.setMaxMsgSize(prodConfigTuple.getF1());
}
return outClientConfig;
}
@@ -2407,7 +2421,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(consumerId).append(" report the info: [");
printHeader = false;
}
- sBuffer.append(type).append(info.toString()).append(", ");
+ sBuffer.append(type).append(info).append(", ");
return printHeader;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 8b066bab9..7a90fab08 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -165,8 +165,8 @@ public class BrokerAbnHolder {
if (brokerFbdInfo == null) {
return retTuple;
}
- retTuple.setF0AndF1(brokerFbdInfo.newStatus.isAcceptPublish(),
- brokerFbdInfo.newStatus.isAcceptSubscribe());
+ retTuple.setF0AndF1(!brokerFbdInfo.newStatus.isAcceptPublish(),
+ !brokerFbdInfo.newStatus.isAcceptSubscribe());
return retTuple;
}
@@ -191,7 +191,6 @@ public class BrokerAbnHolder {
brokerForbiddenCount.set(0);
brokerAbnormalMap.clear();
brokerForbiddenMap.clear();
-
}
public int getCurrentBrokerCount() {
@@ -355,5 +354,4 @@ public class BrokerAbnHolder {
.toString();
}
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index d3cf14c48..d8433c84f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -166,13 +166,26 @@ public class BrokerPSInfoHolder {
return subTopicInfoView.getAcceptSubParts(topic, enableSubBrokerIdSet);
}
+ /**
+ * Get the subscribed TopicInfo information of topic in broker
+ *
+ * @param brokerId need query broker
+ * @param topic need query topic
+ * @param result query result(broker accept subscribe, null or topicInfo configure)
+ */
+ public void getBrokerSubPushedTopicInfo(int brokerId, String topic,
+ Tuple2<Boolean, TopicInfo> result) {
+ result.setF0AndF1(enableSubBrokerIdSet.contains(brokerId),
+ subTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic));
+ }
+
/**
* Gets the string map of topic partitions whose publish status is enabled
*
- * @param topicSet need query topic set
+ * @param topicSizeMap need query topic set and topic's max message size
*/
- public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet) {
- return pubTopicInfoView.getAcceptPubPartInfo(topicSet, enablePubBrokerIdSet);
+ public Map<String, String> getAcceptPubPartInfo(Map<String, Integer> topicSizeMap) {
+ return pubTopicInfoView.getAcceptPubPartInfo(topicSizeMap, enablePubBrokerIdSet);
}
/**
@@ -180,11 +193,14 @@ public class BrokerPSInfoHolder {
*
* @param brokerId need query broker
* @param topic need query topic
- *
- * @return null or topicInfo configure
+ * @param result query result(broker accept publish,
+ * broker accept subscribe, null or topicInfo configure)
*/
- public TopicInfo getBrokerPubPushedTopicInfo(int brokerId, String topic) {
- return pubTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic);
+ public void getBrokerPubPushedTopicInfo(int brokerId, String topic,
+ Tuple3<Boolean, Boolean, TopicInfo> result) {
+ result.setFieldsValue(enablePubBrokerIdSet.contains(brokerId),
+ enableSubBrokerIdSet.contains(brokerId),
+ pubTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic));
}
/**
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index 73d8e5a1d..1d768a5d3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -92,7 +92,7 @@ public interface BrokerRunManager {
BrokerAbnHolder getBrokerAbnHolder();
- Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet);
+ Map<String, String> getPubBrokerAcceptPubPartInfo(Map<String, Integer> topicSizeMap);
int getSubTopicMaxBrokerCount(Set<String> topicSet);
@@ -100,7 +100,10 @@ public interface BrokerRunManager {
List<Partition> getSubBrokerAcceptSubParts(String topic);
- TopicInfo getPubBrokerTopicInfo(int brokerId, String topic);
+ void getSubBrokerTopicInfo(int brokerId, String topic, Tuple2<Boolean, TopicInfo> result);
+
+ void getPubBrokerTopicInfo(int brokerId, String topic,
+ Tuple3<Boolean, Boolean, TopicInfo> result);
void getPubBrokerPushedTopicInfo(int brokerId,
Tuple3<Boolean, Boolean, List<TopicInfo>> result);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
index 77cd64faf..a9a50b764 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
@@ -338,5 +338,4 @@ public class BrokerSyncData {
}
return CheckSum.crc32(buffer.array());
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
index 6ac080b97..b63251867 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
@@ -195,23 +195,23 @@ public class BrokerTopicInfoView {
/**
* Gets the string map of topic partitions whose publish status is enabled
*
- * @param topicSet need query topic set
+ * @param topicSizeMap need query topic and maxsize map
* @param enablePubBrokerIdSet need filtered broker id set
* @return query result
*/
- public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet,
+ public Map<String, String> getAcceptPubPartInfo(Map<String, Integer> topicSizeMap,
Set<Integer> enablePubBrokerIdSet) {
TopicInfo topicInfo;
ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
Map<String, String> topicPartStrMap = new HashMap<>();
Map<String, StringBuilder> topicPartBufferMap = new HashMap<>();
- if (topicSet == null || topicSet.isEmpty()) {
+ if (topicSizeMap == null || topicSizeMap.isEmpty()) {
return topicPartStrMap;
}
- for (String topic : topicSet) {
- if (topic == null) {
- continue;
- }
+ // build topic-partition information
+ StringBuilder tmpValue;
+ StringBuilder confValue;
+ for (String topic : topicSizeMap.keySet()) {
topicInfoView = topicConfInfoMap.get(topic);
if (topicInfoView == null
|| topicInfoView.isEmpty()) {
@@ -225,23 +225,33 @@ public class BrokerTopicInfoView {
}
topicInfo = entry.getValue();
if (topicInfo.isAcceptPublish()) {
- StringBuilder tmpValue = topicPartBufferMap.get(topic);
- if (tmpValue == null) {
- StringBuilder strBuffer =
- new StringBuilder(512).append(topic)
- .append(TokenConstants.SEGMENT_SEP)
- .append(topicInfo.getSimpleValue());
- topicPartBufferMap.put(topic, strBuffer);
+ confValue = topicPartBufferMap.get(topic);
+ if (confValue == null) {
+ tmpValue = new StringBuilder(512).append(topic)
+ .append(TokenConstants.SEGMENT_SEP)
+ .append(topicInfo.getSimpleValue());
+ topicPartBufferMap.put(topic, tmpValue);
} else {
- tmpValue.append(TokenConstants.ARRAY_SEP)
+ confValue.append(TokenConstants.ARRAY_SEP)
.append(topicInfo.getSimpleValue());
}
}
}
}
+ // append max message size information
+ Integer maxMsgSize;
for (Map.Entry<String, StringBuilder> entry : topicPartBufferMap.entrySet()) {
- if (entry.getValue() != null) {
- topicPartStrMap.put(entry.getKey(), entry.getValue().toString());
+ if (entry.getValue() == null) {
+ continue;
+ }
+ confValue = topicPartBufferMap.get(entry.getKey());
+ maxMsgSize = topicSizeMap.get(entry.getKey());
+ if (maxMsgSize == null) {
+ topicPartStrMap.put(entry.getKey(),
+ confValue.append(TokenConstants.SEGMENT_SEP).toString());
+ } else {
+ topicPartStrMap.put(entry.getKey(),
+ confValue.append(TokenConstants.SEGMENT_SEP).append(maxMsgSize).toString());
}
}
topicPartBufferMap.clear();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index c274f67a5..26678c2b1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -509,8 +509,8 @@ public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver {
}
@Override
- public Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet) {
- return brokerPubSubInfo.getAcceptPubPartInfo(topicSet);
+ public Map<String, String> getPubBrokerAcceptPubPartInfo(Map<String, Integer> topicSizeMap) {
+ return brokerPubSubInfo.getAcceptPubPartInfo(topicSizeMap);
}
@Override
@@ -529,8 +529,15 @@ public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver {
}
@Override
- public TopicInfo getPubBrokerTopicInfo(int brokerId, String topic) {
- return brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic);
+ public void getSubBrokerTopicInfo(int brokerId, String topic,
+ Tuple2<Boolean, TopicInfo> result) {
+ brokerPubSubInfo.getBrokerSubPushedTopicInfo(brokerId, topic, result);
+ }
+
+ @Override
+ public void getPubBrokerTopicInfo(int brokerId, String topic,
+ Tuple3<Boolean, Boolean, TopicInfo> result) {
+ brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic, result);
}
@Override
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 9543ae632..0dc4dbb97 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -27,7 +27,7 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
-import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
@@ -68,6 +68,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
// register modify method
registerModifyWebMethod("admin_transfer_current_master",
"transferCurrentMaster");
+ // register modify method
registerModifyWebMethod("admin_set_cluster_default_setting",
"adminSetClusterDefSetting");
registerModifyWebMethod("admin_update_cluster_default_setting",
@@ -274,9 +275,11 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
int totalRunTopicStoreCount = 0;
boolean isSrvAcceptPublish = false;
boolean isSrvAcceptSubscribe = false;
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
boolean enableAuthControl = false;
+ TopicPropGroup topicProps;
+ TopicCtrlEntity authEntity;
+ BrokerConfEntity brokerConfEntity;
+ Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<String, List<TopicDeployEntity>> entry : topicConfMap.entrySet()) {
if (totalCount++ > 0) {
@@ -290,40 +293,32 @@ public class WebMasterInfoHandler extends AbstractWebHandler {
isSrvAcceptPublish = false;
isSrvAcceptSubscribe = false;
enableAuthControl = false;
- isAcceptPublish = false;
- isAcceptSubscribe = false;
for (TopicDeployEntity entity : entry.getValue()) {
- BrokerConfEntity brokerConfEntity =
+ brokerConfEntity =
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
if (brokerConfEntity == null) {
continue;
}
brokerCount++;
- Tuple2<Boolean, Boolean> pubSubStatus =
- WebParameterUtils.getPubSubStatusByManageStatus(
- brokerConfEntity.getManageStatus().getCode());
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
- TopicPropGroup topicProps = entity.getTopicProps();
+ topicProps = entity.getTopicProps();
totalCfgTopicStoreCount += topicProps.getNumTopicStores();
totalCfgNumPartCount +=
topicProps.getNumPartitions() * topicProps.getNumTopicStores();
- TopicInfo topicInfo =
- brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
- if (topicInfo != null) {
- if (isAcceptPublish && topicInfo.isAcceptPublish()) {
+ brokerRunManager.getPubBrokerTopicInfo(
+ entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
+ if (topicInfoTuple.getF2() != null) {
+ if (topicInfoTuple.getF0() && topicInfoTuple.getF2().isAcceptPublish()) {
isSrvAcceptPublish = true;
}
- if (isAcceptSubscribe && topicInfo.isAcceptSubscribe()) {
+ if (topicInfoTuple.getF1() && topicInfoTuple.getF2().isAcceptSubscribe()) {
isSrvAcceptSubscribe = true;
}
- totalRunTopicStoreCount += topicInfo.getTopicStoreNum();
+ totalRunTopicStoreCount += topicInfoTuple.getF2().getTopicStoreNum();
totalRunNumPartCount +=
- topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
+ topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
}
}
- TopicCtrlEntity authEntity =
- defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
+ authEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
if (authEntity != null) {
enableAuthControl = authEntity.isAuthCtrlEnable();
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 344cb0f71..8d991bbb2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -28,8 +28,8 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
-import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus;
import org.apache.inlong.tubemq.server.common.statusdef.TopicStsChgType;
import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils;
@@ -607,16 +607,19 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
Map<String, List<TopicDeployEntity>> topicDeployInfoMap) {
// build query result
int totalCnt = 0;
+ int itemCount = 0;
+ int condStatusId = 1;
int maxMsgSizeInMB = 0;
int totalCfgNumPartCount = 0;
int totalRunNumPartCount = 0;
boolean enableAuthCtrl;
boolean isSrvAcceptPublish = false;
boolean isSrvAcceptSubscribe = false;
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
- ManageStatus manageStatus;
- Tuple2<Boolean, Boolean> pubSubStatus;
+ String strManageStatus;
+ TopicCtrlEntity ctrlEntity;
+ BrokerConfEntity brokerConfEntity;
+ List<GroupConsumeCtrlEntity> groupCtrlInfoLst;
+ Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
BrokerRunManager brokerRunManager = master.getBrokerRunManager();
ClusterSettingEntity defSetting = defMetaDataService.getClusterDefSetting(false);
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
@@ -625,11 +628,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
totalRunNumPartCount = 0;
isSrvAcceptPublish = false;
isSrvAcceptSubscribe = false;
- isAcceptPublish = false;
- isAcceptSubscribe = false;
- enableAuthCtrl = false;
- TopicCtrlEntity ctrlEntity =
- defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
+ ctrlEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
if (ctrlEntity == null) {
continue;
}
@@ -644,54 +643,55 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
sBuffer.append("{\"topicName\":\"").append(entry.getKey())
.append("\",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
.append(",\"topicInfo\":[");
- int brokerCount = 0;
+ itemCount = 0;
for (TopicDeployEntity entity : entry.getValue()) {
- if (brokerCount++ > 0) {
+ if (itemCount++ > 0) {
sBuffer.append(",");
}
totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
entity.toWebJsonStr(sBuffer, true, false);
sBuffer.append(",\"runInfo\":{");
- BrokerConfEntity brokerConfEntity =
+ strManageStatus = "-";
+ brokerConfEntity =
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
- String strManageStatus = "-";
if (brokerConfEntity != null) {
- manageStatus = brokerConfEntity.getManageStatus();
- strManageStatus = manageStatus.getDescription();
- pubSubStatus = manageStatus.getPubSubStatus();
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
+ strManageStatus = brokerConfEntity.getManageStatus().getDescription();
}
- TopicInfo topicInfo =
- brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
- if (topicInfo == null) {
+ brokerRunManager.getPubBrokerTopicInfo(
+ entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
+ if (topicInfoTuple.getF2() == null) {
sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
.append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
} else {
- if (isAcceptPublish) {
- sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
- if (topicInfo.isAcceptPublish()) {
+ if (topicInfoTuple.getF0()) {
+ sBuffer.append("\"acceptPublish\":")
+ .append(topicInfoTuple.getF2().isAcceptPublish());
+ if (topicInfoTuple.getF2().isAcceptPublish()) {
isSrvAcceptPublish = true;
}
} else {
sBuffer.append("\"acceptPublish\":false");
}
- if (isAcceptSubscribe) {
- sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
- if (topicInfo.isAcceptSubscribe()) {
+ if (topicInfoTuple.getF1()) {
+ sBuffer.append(",\"acceptSubscribe\":")
+ .append(topicInfoTuple.getF2().isAcceptSubscribe());
+ if (topicInfoTuple.getF2().isAcceptSubscribe()) {
isSrvAcceptSubscribe = true;
}
} else {
sBuffer.append(",\"acceptSubscribe\":false");
}
- totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
- sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
- .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+ totalRunNumPartCount +=
+ topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
+ sBuffer.append(",\"numPartitions\":")
+ .append(topicInfoTuple.getF2().getPartitionNum())
+ .append(",\"numTopicStores\":")
+ .append(topicInfoTuple.getF2().getTopicStoreNum())
.append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
}
sBuffer.append("}}");
}
- sBuffer.append("],\"infoCount\":").append(brokerCount)
+ sBuffer.append("],\"infoCount\":").append(itemCount)
.append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
.append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
.append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
@@ -702,9 +702,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
.append(",\"createUser\":\"").append(ctrlEntity.getModifyUser())
.append("\",\"createDate\":\"").append(ctrlEntity.getModifyDateStr())
.append("\",\"authConsumeGroup\":[");
- List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
- defMetaDataService.getConsumeCtrlByTopic(entry.getKey());
- int itemCount = 0;
+ itemCount = 0;
+ groupCtrlInfoLst = defMetaDataService.getConsumeCtrlByTopic(entry.getKey());
for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
if (itemCount++ > 0) {
sBuffer.append(",");
@@ -718,7 +717,6 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
sBuffer.append("],\"groupCount\":").append(itemCount).append(",\"authFilterCondSet\":[");
itemCount = 0;
- int condStatusId = 1;
for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
if (itemCount++ > 0) {
sBuffer.append(",");
@@ -753,14 +751,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
Map<String, List<TopicDeployEntity>> topicDeployMap) {
// build query result
int totalCnt = 0;
+ int itemCount = 0;
int totalCfgNumPartCount = 0;
int totalRunNumPartCount = 0;
boolean isSrvAcceptPublish = false;
boolean isSrvAcceptSubscribe = false;
- boolean isAcceptPublish = false;
- boolean isAcceptSubscribe = false;
- ManageStatus manageStatus;
- Tuple2<Boolean, Boolean> pubSubStatus;
+ String strManageStatus;
+ TopicCtrlEntity ctrlEntity;
+ BrokerConfEntity brokerConfEntity;
+ List<GroupConsumeCtrlEntity> groupCtrlInfoLst;
+ Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>();
BrokerRunManager brokerRunManager = master.getBrokerRunManager();
WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployMap.entrySet()) {
@@ -768,9 +768,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
totalRunNumPartCount = 0;
isSrvAcceptPublish = false;
isSrvAcceptSubscribe = false;
- isAcceptPublish = false;
- isAcceptSubscribe = false;
- TopicCtrlEntity ctrlEntity =
+ ctrlEntity =
defMetaDataService.getTopicCtrlByTopicName(entry.getKey());
if (ctrlEntity == null) {
continue;
@@ -780,71 +778,71 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
ctrlEntity.toWebJsonStr(sBuffer, true, false);
sBuffer.append(",\"deployInfo\":[");
- int brokerCount = 0;
+ itemCount = 0;
for (TopicDeployEntity entity : entry.getValue()) {
- if (brokerCount++ > 0) {
+ if (itemCount++ > 0) {
sBuffer.append(",");
}
totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores();
entity.toWebJsonStr(sBuffer, true, false);
sBuffer.append(",\"runInfo\":{");
- BrokerConfEntity brokerConfEntity =
+ brokerConfEntity =
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
-
- String strManageStatus = "-";
+ strManageStatus = "-";
if (brokerConfEntity != null) {
- manageStatus = brokerConfEntity.getManageStatus();
- strManageStatus = manageStatus.getDescription();
- pubSubStatus = manageStatus.getPubSubStatus();
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
+ strManageStatus = brokerConfEntity.getManageStatus().getDescription();
}
- TopicInfo topicInfo =
- brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName());
- if (topicInfo == null) {
+ brokerRunManager.getPubBrokerTopicInfo(
+ entity.getBrokerId(), entity.getTopicName(), topicInfoTuple);
+ if (topicInfoTuple.getF2() == null) {
sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"")
.append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\"");
} else {
- if (isAcceptPublish) {
- sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish());
- if (topicInfo.isAcceptPublish()) {
+ if (topicInfoTuple.getF0()) {
+ sBuffer.append("\"acceptPublish\":")
+ .append(topicInfoTuple.getF2().isAcceptPublish());
+ if (topicInfoTuple.getF2().isAcceptPublish()) {
isSrvAcceptPublish = true;
}
} else {
sBuffer.append("\"acceptPublish\":false");
}
- if (isAcceptSubscribe) {
- sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe());
- if (topicInfo.isAcceptSubscribe()) {
+ if (topicInfoTuple.getF1()) {
+ sBuffer.append(",\"acceptSubscribe\":")
+ .append(topicInfoTuple.getF2().isAcceptSubscribe());
+ if (topicInfoTuple.getF2().isAcceptSubscribe()) {
isSrvAcceptSubscribe = true;
}
} else {
sBuffer.append(",\"acceptSubscribe\":false");
}
- totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum();
- sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum())
- .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum())
+ totalRunNumPartCount +=
+ topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum();
+ sBuffer.append(",\"numPartitions\":")
+ .append(topicInfoTuple.getF2().getPartitionNum())
+ .append(",\"numTopicStores\":")
+ .append(topicInfoTuple.getF2().getTopicStoreNum())
.append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\"");
}
sBuffer.append("}}");
}
- sBuffer.append("],\"infoCount\":").append(brokerCount)
+ sBuffer.append("],\"infoCount\":").append(itemCount)
.append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount)
.append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish)
.append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe)
.append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount);
if (withAuthInfo) {
sBuffer.append(",\"groupAuthInfo\":[");
- List<GroupConsumeCtrlEntity> groupCtrlInfoLst =
+ groupCtrlInfoLst =
defMetaDataService.getConsumeCtrlByTopic(entry.getKey());
- int countJ = 0;
+ itemCount = 0;
for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) {
- if (countJ++ > 0) {
+ if (itemCount++ > 0) {
sBuffer.append(",");
}
groupEntity.toWebJsonStr(sBuffer, true, true);
}
- sBuffer.append("],\"groupAuthCount\":").append(countJ);
+ sBuffer.append("],\"groupAuthCount\":").append(itemCount);
}
sBuffer.append("}");
}
@@ -1073,5 +1071,4 @@ public class WebTopicDeployHandler extends AbstractWebHandler {
}
return buildRetInfo(retInfo, sBuffer);
}
-
}