You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/09 11:55:55 UTC
[inlong] 02/07: [INLONG-7182][TubeMQ] Replace ParamCheckResult with ProcessResult (#7183)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f06a60a40c43ab8e9dbc4b6def0b89089a708b41
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Jan 7 23:05:16 2023 +0800
[INLONG-7182][TubeMQ] Replace ParamCheckResult with ProcessResult (#7183)
---
.../tubemq/server/broker/BrokerServiceServer.java | 28 +-
.../server/common/paramcheck/PBParameterUtils.java | 226 +++++-----
.../server/common/paramcheck/ParamCheckResult.java | 60 ---
.../inlong/tubemq/server/master/TMaster.java | 455 ++++++++++-----------
.../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 105 +++--
.../nodeconsumer/ConsumerInfoHolder.java | 14 +-
.../tubemq/server/common/PBParameterTest.java | 45 +-
7 files changed, 391 insertions(+), 542 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 658277519..03c661370 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -74,7 +74,6 @@ import org.apache.inlong.tubemq.server.common.TStatusConstants;
import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler;
import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedResult;
import org.apache.inlong.tubemq.server.common.exception.HeartbeatException;
-import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutListener;
@@ -295,16 +294,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
// get and check clientId field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
- request.getClientId(), strBuffer, result)) {
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String clientId = (String) result.getRetData();
// get and check groupName field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
- request.getGroupName(), strBuffer, result)) {
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -618,8 +615,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
return builder.build();
}
// get and check clientId field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
- request.getClientId(), strBuffer, result)) {
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -832,8 +828,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
ProcessResult result = new ProcessResult();
final StringBuilder strBuffer = new StringBuilder(512);
// get and check clientId field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
- request.getClientId(), strBuffer, result)) {
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -849,8 +844,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
// get consumer info
final String topicName = (String) result.getRetData();
// get and check groupName field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
- request.getGroupName(), strBuffer, result)) {
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -1112,16 +1106,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
return builder.build();
}
// get and check clientId field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
- request.getClientId(), strBuffer, result)) {
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String clientId = (String) result.getRetData();
// get and check groupName field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
- request.getGroupName(), strBuffer, result)) {
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -1229,16 +1221,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
// get and check clientId field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
- request.getClientId(), strBuffer, result)) {
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
final String clientId = (String) result.getRetData();
// get and check groupName field
- if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
- request.getGroupName(), strBuffer, result)) {
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
index 2e32e4f5d..718b7f649 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -48,17 +48,16 @@ public class PBParameterUtils {
* Check request topic list of producer
*
* @param reqTopicLst the topic list to be checked.
- * @param strBuffer a string buffer used to construct the result
- * @return the check result
+ * @param strBuff a string buffer used to construct the result
+ * @param result the process result
+ * @return success or failure
*/
- public static ParamCheckResult checkProducerTopicList(final List<String> reqTopicLst,
- final StringBuilder strBuffer) {
- ParamCheckResult retResult = new ParamCheckResult();
+ public static boolean checkProducerTopicList(List<String> reqTopicLst,
+ StringBuilder strBuff, ProcessResult result) {
if (reqTopicLst == null) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request miss necessary topic field info!");
- return retResult;
+ return result.isSuccess();
}
Set<String> transTopicList = new HashSet<>();
if (!reqTopicLst.isEmpty()) {
@@ -69,27 +68,25 @@ public class PBParameterUtils {
topic = topic.trim();
// filter system topic OFFSET_HISTORY_NAME
if (topic.equals(TServerConstants.OFFSET_HISTORY_NAME)) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("System Topic ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("System Topic ")
.append(TServerConstants.OFFSET_HISTORY_NAME)
.append(" does not allow client produce data!").toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
}
transTopicList.add(topic);
}
}
if (transTopicList.size() > TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("Booked topic's count over max value, required max count is ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("Booked topic's count over max value, required max count is ")
.append(TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT).toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
}
- retResult.setCheckData(transTopicList);
- return retResult;
+ result.setSuccResult(transTopicList);
+ return result.isSuccess();
}
/**
@@ -98,12 +95,11 @@ public class PBParameterUtils {
* @param depTopicSet the deployed topic set
* @param reqTopicLst the topic list to be checked.
* @param strBuff a string buffer used to construct the result
+ * @param result process result
* @return the check result
*/
public static boolean checkConsumerTopicList(Set<String> depTopicSet,
- List<String> reqTopicLst,
- ProcessResult result,
- StringBuilder strBuff) {
+ List<String> reqTopicLst, StringBuilder strBuff, ProcessResult result) {
if ((reqTopicLst == null) || (reqTopicLst.isEmpty())) {
result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request miss necessary subscribed topicList data!");
@@ -154,61 +150,54 @@ public class PBParameterUtils {
* @param csmType the topic list to be checked.
* @param reqTopicSet the subscribed topic set
* @param requiredParts the specified partitionKey-bootstrap offset map
- * @param strBuffer the string buffer used to construct the result
+ * @param strBuff the string buffer used to construct the result
* @return the check result
*/
- public static ParamCheckResult checkConsumerOffsetSetInfo(ConsumeType csmType,
- final Set<String> reqTopicSet,
- final String requiredParts,
- final StringBuilder strBuffer) {
+ public static boolean checkConsumerOffsetSetInfo(ConsumeType csmType, Set<String> reqTopicSet,
+ String requiredParts, StringBuilder strBuff, ProcessResult result) {
Map<String, Long> requiredPartMap = new HashMap<>();
- ParamCheckResult retResult = new ParamCheckResult();
if (csmType != ConsumeType.CONSUME_BAND) {
- retResult.setCheckData(requiredPartMap);
- return retResult;
+ result.setSuccResult(requiredPartMap);
+ return result.isSuccess();
}
if (TStringUtils.isBlank(requiredParts)) {
- retResult.setCheckData(requiredPartMap);
- return retResult;
+ result.setSuccResult(requiredPartMap);
+ return result.isSuccess();
}
String[] partOffsetItems = requiredParts.trim().split(TokenConstants.ARRAY_SEP);
for (String partOffset : partOffsetItems) {
String[] partKeyVal = partOffset.split(TokenConstants.EQ);
if (partKeyVal.length == 1) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[Parameter error] unformatted Partition-Offset value : ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("[Parameter error] unformatted Partition-Offset value : ")
.append(partOffset).append(" must be aa:bbb:ccc=val1,ddd:eee:ff=val2").toString());
- return retResult;
+ return result.isSuccess();
}
String[] partKeyItems = partKeyVal[0].trim().split(TokenConstants.ATTR_SEP);
if (partKeyItems.length != 3) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[Parameter error] unformatted Partition-Offset value : ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("[Parameter error] unformatted Partition-Offset value : ")
.append(partOffset).append(" must be aa:bbb:ccc=val1,ddd:eee:ff=val2").toString());
- return retResult;
+ return result.isSuccess();
}
if (!reqTopicSet.contains(partKeyItems[1].trim())) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[Parameter error] wrong offset reset for unsubscribed topic: reset item is ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("[Parameter error] wrong offset reset for unsubscribed topic: reset item is ")
.append(partOffset).append(", request topicList are ")
- .append(reqTopicSet.toString()).toString());
- return retResult;
+ .append(reqTopicSet).toString());
+ return result.isSuccess();
}
try {
requiredPartMap.put(partKeyVal[0].trim(), Long.parseLong(partKeyVal[1].trim()));
} catch (Throwable ex) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[Parameter error] required long type value of ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("[Parameter error] required long type value of ")
.append(partOffset).append("' Offset!").toString());
- return retResult;
+ return result.isSuccess();
}
}
- retResult.setCheckData(requiredPartMap);
- return retResult;
+ result.setSuccResult(requiredPartMap);
+ return result.isSuccess();
}
/**
@@ -219,43 +208,41 @@ public class PBParameterUtils {
* @param masterConfig the master configure
* @param defMetaDataService the cluster meta information
* @param brokerRunManager the broker running information
- * @param strBuffer the string buffer used to construct the result
+ * @param strBuff the string buffer used to construct the result
+ * @param result the process result
* @return the check result
*/
- public static ParamCheckResult checkConsumerInputInfo(ConsumerInfo inConsumerInfo,
+ public static boolean checkConsumerInputInfo(ConsumerInfo inConsumerInfo,
MasterConfig masterConfig,
MetaDataService defMetaDataService,
BrokerRunManager brokerRunManager,
- StringBuilder strBuffer) throws Exception {
- ParamCheckResult retResult = new ParamCheckResult();
+ StringBuilder strBuff,
+ ProcessResult result) throws Exception {
if (!inConsumerInfo.isRequireBound()) {
- retResult.setCheckData(inConsumerInfo);
- return retResult;
+ result.setSuccResult(inConsumerInfo);
+ return result.isSuccess();
}
if (TStringUtils.isBlank(inConsumerInfo.getSessionKey())) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"[Parameter error] blank value of sessionKey!");
- return retResult;
+ return result.isSuccess();
}
inConsumerInfo.setSessionKey(inConsumerInfo.getSessionKey().trim());
if (inConsumerInfo.getSourceCount() <= 0) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"[Parameter error] totalSourceCount must over zero!");
- return retResult;
+ return result.isSuccess();
}
GroupResCtrlEntity offsetResetGroupEntity =
defMetaDataService.getGroupCtrlConf(inConsumerInfo.getGroupName());
if (masterConfig.isStartOffsetResetCheck()) {
if (offsetResetGroupEntity == null) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[unauthorized subscribe] ConsumeGroup must be ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("[unauthorized subscribe] ConsumeGroup must be ")
.append("authorized by administrator before using bound subscribe")
.append(", please contact to administrator!").toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
+ strBuff.delete(0, strBuff.length());
+ return result.isSuccess();
}
}
int allowRate = (offsetResetGroupEntity != null
@@ -270,107 +257,82 @@ public class PBParameterUtils {
if (maxBrokerCount % allowRate != 0) {
minClientCnt += 1;
}
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("[Parameter error] System requires at least ")
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("[Parameter error] System requires at least ")
.append(minClientCnt).append(" clients to consume data together, ")
.append("please add client resources!").toString());
- return retResult;
+ return result.isSuccess();
}
- retResult.setCheckData(inConsumerInfo);
- return retResult;
+ result.setSuccResult(inConsumerInfo);
+ return result.isSuccess();
}
/**
* Check the id of broker
*
* @param brokerId the id of broker to be checked
- * @param strBuffer the string buffer used to construct check result
+ * @param strBuff the string buffer used to construct check result
+ * @param result the process result
* @return the check result
*/
- public static ParamCheckResult checkBrokerId(final String brokerId,
- final StringBuilder strBuffer) {
- ParamCheckResult retResult = new ParamCheckResult();
+ public static boolean checkBrokerId(String brokerId,
+ StringBuilder strBuff, ProcessResult result) {
if (TStringUtils.isBlank(brokerId)) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
"Request miss necessary brokerId data");
- return retResult;
+ return result.isSuccess();
}
String tmpValue = brokerId.trim();
try {
- retResult.setCheckData(Integer.parseInt(tmpValue));
+ result.setSuccResult(Integer.parseInt(tmpValue));
} catch (Throwable e) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("Parse brokerId to int failure ").append(e.getMessage()).toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ strBuff.append("Parse brokerId to int failure ").append(e.getMessage()).toString());
+ strBuff.delete(0, strBuff.length());
}
- return retResult;
+ return result.isSuccess();
}
/**
* Check the clientID.
*
* @param clientId the client id to be checked
- * @param strBuffer the string used to construct the result
+ * @param strBuff the string used to construct the result
+ * @param result process result
* @return the check result
*/
- public static ParamCheckResult checkClientId(final String clientId, final StringBuilder strBuffer) {
- return validStringParameter("clientId",
- clientId, TBaseConstants.META_MAX_CLIENT_ID_LENGTH, strBuffer);
+ public static boolean checkClientId(String clientId,
+ StringBuilder strBuff, ProcessResult result) {
+ return PBParameterUtils.getStringParameter(
+ WebFieldDef.CLIENTID, clientId, strBuff, result);
}
/**
* Check the hostname.
*
* @param hostName the hostname to be checked.
- * @param strBuffer the string used to construct the result
+ * @param strBuff the string used to construct the result
+ * @param result the process result
* @return the check result
*/
- public static ParamCheckResult checkHostName(final String hostName, final StringBuilder strBuffer) {
- return validStringParameter("hostName",
- hostName, TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH, strBuffer);
+ public static boolean checkHostName(String hostName,
+ StringBuilder strBuff, ProcessResult result) {
+ return PBParameterUtils.getStringParameter(
+ WebFieldDef.HOSTNAME, hostName, strBuff, result);
}
/**
* Check the group name
*
* @param groupName the group name to be checked
- * @param strBuffer the string used to construct the result
+ * @param strBuff the string used to construct the result
+ * @param result the process result
* @return the check result
*/
- public static ParamCheckResult checkGroupName(final String groupName, final StringBuilder strBuffer) {
- return validStringParameter("groupName",
- groupName, TBaseConstants.META_MAX_GROUPNAME_LENGTH, strBuffer);
- }
-
- private static ParamCheckResult validStringParameter(final String paramName,
- final String paramValue,
- int paramMaxLen,
- final StringBuilder strBuffer) {
- ParamCheckResult retResult = new ParamCheckResult();
- if (TStringUtils.isBlank(paramValue)) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append("Request miss necessary ")
- .append(paramName).append(" data!").toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
- }
- String tmpValue = paramValue.trim();
- if (tmpValue.length() > paramMaxLen) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- strBuffer.append(paramName)
- .append("'s length over max value, required max length is ")
- .append(paramMaxLen).toString());
- strBuffer.delete(0, strBuffer.length());
- return retResult;
- }
- retResult.setCheckData(tmpValue);
- return retResult;
+ public static boolean checkGroupName(String groupName,
+ StringBuilder strBuff, ProcessResult result) {
+ return PBParameterUtils.getStringParameter(
+ WebFieldDef.GROUPNAME, groupName, strBuff, result);
}
/**
@@ -383,9 +345,7 @@ public class PBParameterUtils {
* @return result success or failure
*/
public static boolean getStringParameter(WebFieldDef fieldDef,
- String paramValue,
- StringBuilder strBuffer,
- ProcessResult result) {
+ String paramValue, StringBuilder strBuffer, ProcessResult result) {
if (TStringUtils.isBlank(paramValue)) {
result.setFailResult(strBuffer.append("Request miss necessary ")
.append(fieldDef.name).append(" data!").toString());
@@ -414,9 +374,7 @@ public class PBParameterUtils {
* @return the check result
*/
public static boolean getTopicNameParameter(String topicName,
- MetadataManager metadataManager,
- StringBuilder strBuffer,
- ProcessResult result) {
+ MetadataManager metadataManager, StringBuilder strBuffer, ProcessResult result) {
if (!getStringParameter(WebFieldDef.TOPICNAME,
topicName, strBuffer, result)) {
return result.isSuccess();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/ParamCheckResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/ParamCheckResult.java
deleted file mode 100644
index 8da6562d6..000000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/ParamCheckResult.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.common.paramcheck;
-
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-
-public class ParamCheckResult {
-
- public boolean result;
- public int errCode;
- public String errMsg;
- public Object checkData;
-
- public ParamCheckResult() {
- this.result = false;
- this.errCode = TErrCodeConstants.BAD_REQUEST;
- this.errMsg = "Unset object!";
- }
-
- public ParamCheckResult(boolean result, int errCode, final String errMsg) {
- this.result = result;
- this.errCode = errCode;
- this.errMsg = errMsg;
- }
-
- public ParamCheckResult(Object checkData) {
- this.result = true;
- this.errCode = TErrCodeConstants.SUCCESS;
- this.errMsg = "Ok";
- this.checkData = checkData;
- }
-
- public void setCheckResult(boolean result, int errCode, final String errMsg) {
- this.result = result;
- this.errCode = errCode;
- this.errMsg = errMsg;
- }
-
- public void setCheckData(Object checkData) {
- this.result = true;
- this.errCode = TErrCodeConstants.SUCCESS;
- this.errMsg = "Ok";
- this.checkData = checkData;
- }
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 5b119dd3a..2e1a67075 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -97,7 +97,6 @@ import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutListener;
import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
-import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.apache.inlong.tubemq.server.common.utils.ClientSyncInfo;
import org.apache.inlong.tubemq.server.common.utils.HasThread;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
@@ -310,7 +309,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public RegisterResponseM2P producerRegisterP2M(RegisterRequestP2M request,
final String rmtAddress,
boolean overtls) throws Exception {
- final StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2P.Builder builder = RegisterResponseM2P.newBuilder();
builder.setSuccess(false);
builder.setBrokerCheckSum(-1);
@@ -321,35 +321,30 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String producerId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String producerId = (String) result.getRetData();
+ if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String hostName = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String hostName = (String) result.getRetData();
+ if (!PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final Set<String> transTopicSet = (Set<String>) paramCheckResult.checkData;
+ final Set<String> transTopicSet = (Set<String>) result.getRetData();
if (!request.hasBrokerCheckSum()) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg("Request miss necessary brokerCheckSum field!");
return builder.build();
}
- checkNodeStatus(producerId, strBuffer);
+ checkNodeStatus(producerId, strBuff);
CertifiedResult authorizeResult =
serverAuthHandler.validProducerAuthorizeInfo(
certResult.userName, transTopicSet, rmtAddress);
@@ -376,7 +371,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (clientConfigBuilder != null) {
builder.setAppdConfig(clientConfigBuilder);
}
- logger.info(strBuffer.append("[Producer Register] ").append(producerId)
+ logger.info(strBuff.append("[Producer Register] ").append(producerId)
.append(", isOverTLS=").append(overtls)
.append(", clientJDKVer=").append(clientJdkVer).toString());
builder.setSuccess(true);
@@ -398,7 +393,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public HeartResponseM2P producerHeartbeatP2M(HeartRequestP2M request,
final String rmtAddress,
boolean overtls) throws Exception {
- final StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
HeartResponseM2P.Builder builder = HeartResponseM2P.newBuilder();
builder.setSuccess(false);
builder.setBrokerCheckSum(-1);
@@ -409,36 +405,31 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String producerId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String producerId = (String) result.getRetData();
+ if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String hostName = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String hostName = (String) result.getRetData();
+ if (!PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final Set<String> transTopicSet = (Set<String>) paramCheckResult.checkData;
+ final Set<String> transTopicSet = (Set<String>) result.getRetData();
if (!request.hasBrokerCheckSum()) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg("Request miss necessary brokerCheckSum field!");
return builder.build();
}
final long inBrokerCheckSum = request.getBrokerCheckSum();
- checkNodeStatus(producerId, strBuffer);
+ checkNodeStatus(producerId, strBuff);
CertifiedResult authorizeResult =
serverAuthHandler.validProducerAuthorizeInfo(
certResult.userName, transTopicSet, rmtAddress);
@@ -477,7 +468,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setAppdConfig(clientConfigBuilder);
}
if (logger.isDebugEnabled()) {
- logger.debug(strBuffer.append("[Push Producer's available topic count:]")
+ logger.debug(strBuff.append("[Push Producer's available topic count:]")
.append(producerId).append(TokenConstants.LOG_SEG_SEP)
.append((prodTopicConfigTuple.getF2() == null)
? 0
@@ -503,7 +494,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public CloseResponseM2P producerCloseClientP2M(CloseRequestP2M request,
final String rmtAddress,
boolean overtls) throws Exception {
- final StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
CloseResponseM2P.Builder builder = CloseResponseM2P.newBuilder();
builder.setSuccess(false);
CertifiedResult certResult =
@@ -513,18 +505,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String producerId = (String) paramCheckResult.checkData;
- checkNodeStatus(producerId, strBuffer);
+ final String producerId = (String) result.getRetData();
+ checkNodeStatus(producerId, strBuff);
new ReleaseProducer().run(producerId, false);
heartbeatManager.unRegProducerNode(producerId);
- logger.info(strBuffer.append("[Producer Closed] ")
+ logger.info(strBuff.append("[Producer Closed] ")
.append(producerId).append(", isOverTLS=").append(overtls).toString());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -547,7 +537,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
boolean overtls) throws Exception {
// #lizard forgives
ProcessResult result = new ProcessResult();
- final StringBuilder strBuffer = new StringBuilder(512);
+ final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2C.Builder builder = RegisterResponseM2C.newBuilder();
builder.setSuccess(false);
CertifiedResult certResult =
@@ -557,31 +547,27 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String consumerId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String consumerId = (String) result.getRetData();
+ if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- // final String hostName = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // final String hostName = (String) result.getRetData();
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
- checkNodeStatus(consumerId, strBuffer);
+ final String groupName = (String) result.getRetData();
+ checkNodeStatus(consumerId, strBuff);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
- request.getTopicListList(), result, strBuffer)) {
+ request.getTopicListList(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -594,14 +580,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
? ConsumeType.CONSUME_BAND
: ConsumeType.CONSUME_NORMAL;
final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
- paramCheckResult = PBParameterUtils.checkConsumerOffsetSetInfo(csmType,
- reqTopicSet, requiredParts, strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkConsumerOffsetSetInfo(
+ csmType, reqTopicSet, requiredParts, strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- Map<String, Long> requiredPartMap = (Map<String, Long>) paramCheckResult.checkData;
+ Map<String, Long> requiredPartMap = (Map<String, Long>) result.getRetData();
String sessionKey = request.hasSessionKey() ? request.getSessionKey() : "";
long sessionTime = request.hasSessionTime()
? request.getSessionTime()
@@ -626,15 +611,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
reqTopicSet, reqTopicConditions, csmType,
sessionKey, sessionTime, sourceCount,
isSelectBig, requiredPartMap, rmtAddress);
- paramCheckResult =
- PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
- masterConfig, defMetaDataService, brokerRunManager, strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
+ masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- ConsumerInfo inConsumerInfo2 = (ConsumerInfo) paramCheckResult.checkData;
+ ConsumerInfo inConsumerInfo2 = (ConsumerInfo) result.getRetData();
CertifiedResult authorizeResult =
serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName,
groupName, reqTopicSet, reqTopicConditions, rmtAddress);
@@ -646,9 +629,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// need removed for authorize center begin
if (!this.defMetaDataService
.isConsumeTargetAuthorized(consumerId, groupName,
- reqTopicSet, reqTopicConditions, strBuffer, result)) {
- if (strBuffer.length() > 0) {
- logger.warn(strBuffer.toString());
+ reqTopicSet, reqTopicConditions, strBuff, result)) {
+ if (strBuff.length() > 0) {
+ logger.warn(strBuff.toString());
}
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -660,32 +643,32 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
try {
lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
if (!consumerHolder.addConsumer(inConsumerInfo2,
- isNotAllocated, strBuffer, paramCheckResult)) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ isNotAllocated, strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData;
+ consumeGroupInfo = (ConsumeGroupInfo) result.getRetData();
topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
if (CollectionUtils.isNotEmpty(subscribeList)) {
int reportCnt = 0;
Map<String, Partition> partMap;
Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
currentSubInfo.put(consumerId, topicPartSubMap);
- strBuffer.append("[SubInfo Report] client=").append(consumerId)
+ strBuff.append("[SubInfo Report] client=").append(consumerId)
.append(", subscribed partitions=[");
for (SubscribeInfo info : subscribeList) {
partMap = topicPartSubMap.computeIfAbsent(
info.getTopic(), k -> new HashMap<>());
partMap.put(info.getPartition().getPartitionKey(), info.getPartition());
if (reportCnt++ > 0) {
- strBuffer.append(",");
+ strBuff.append(",");
}
- strBuffer.append(info.getPartitionStr());
+ strBuff.append(info.getPartitionStr());
}
- strBuffer.append("]");
- logger.info(strBuffer.toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.append("]");
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
}
heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
} catch (IOException e) {
@@ -695,10 +678,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
this.masterRowLock.releaseRowLock(lid);
}
}
- logger.info(strBuffer.append("[Consumer Register] ")
+ logger.info(strBuff.append("[Consumer Register] ")
.append(consumerId).append(", isOverTLS=").append(overtls)
.append(", clientJDKVer=").append(clientJdkVer).toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
if (request.hasDefFlowCheckId() || request.hasGroupFlowCheckId()) {
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setDefFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
@@ -747,7 +730,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
final String rmtAddress,
boolean overtls) throws Throwable {
// #lizard forgives
- final StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
// response
HeartResponseM2C.Builder builder = HeartResponseM2C.newBuilder();
builder.setSuccess(false);
@@ -759,26 +743,23 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.getRetData();
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
- checkNodeStatus(clientId, strBuffer);
+ final String groupName = (String) result.getRetData();
+ checkNodeStatus(clientId, strBuff);
ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
if (consumeGroupInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer.append("Not found groupName ")
+ builder.setErrMsg(strBuff.append("Not found groupName ")
.append(groupName).append(" in holder!").toString());
return builder.build();
}
@@ -797,7 +778,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
} catch (HeartbeatException e) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer
+ builder.setErrMsg(strBuff
.append("Update consumer node exception:")
.append(e.getMessage()).toString());
return builder.build();
@@ -834,21 +815,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
partMap.put(regPart.getPartitionKey(), regPart);
}
if (rebalanceId <= 0) {
- logger.warn(strBuffer.append("[Consistent Warn]").append(clientId)
+ logger.warn(strBuff.append("[Consistent Warn]").append(clientId)
.append(" sub info is not consistent with master.").toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
}
}
}
//
if (rebalanceId > 0) {
- logger.info(strBuffer.append("[Event Processed] rebalanceId=")
+ logger.info(strBuff.append("[Event Processed] rebalanceId=")
.append(request.getEvent().getRebalanceId())
.append(", clientId=").append(clientId).toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
try {
consumeGroupInfo.settAllocated();
- consumerEventManager.removeFirst(clientId, strBuffer);
+ consumerEventManager.removeFirst(clientId, strBuff);
} catch (Throwable e) {
logger.warn("Unknown exception for remove first event:", e);
}
@@ -859,9 +840,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (event != null
&& event.getStatus() != EventStatus.PROCESSING) {
event.setStatus(EventStatus.PROCESSING);
- strBuffer.append("[Push Consumer Event]");
- logger.info(event.toStrBuilder(clientId, strBuffer).toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.append("[Push Consumer Event]");
+ logger.info(event.toStrBuilder(clientId, strBuff).toString());
+ strBuff.delete(0, strBuff.length());
EventProto.Builder eventProtoBuilder =
EventProto.newBuilder();
eventProtoBuilder.setRebalanceId(event.getRebalanceId());
@@ -919,7 +900,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public CloseResponseM2C consumerCloseClientC2M(CloseRequestC2M request,
final String rmtAddress,
boolean overtls) throws Exception {
- StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ StringBuilder strBuff = new StringBuilder(512);
CloseResponseM2C.Builder builder = CloseResponseM2C.newBuilder();
builder.setSuccess(false);
CertifiedResult certResult =
@@ -929,24 +911,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.getRetData();
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
- checkNodeStatus(clientId, strBuffer);
+ final String groupName = (String) result.getRetData();
+ checkNodeStatus(clientId, strBuff);
String nodeId = getConsumerKey(groupName, clientId);
- logger.info(strBuffer.append("[Consumer Closed]").append(nodeId)
+ logger.info(strBuff.append("[Consumer Closed]").append(nodeId)
.append(", isOverTLS=").append(overtls).toString());
new ReleaseConsumer().run(nodeId, false);
heartbeatManager.unRegConsumerNode(nodeId);
@@ -984,18 +963,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
ProcessResult result = new ProcessResult();
- final StringBuilder strBuffer = new StringBuilder(512);
+ final StringBuilder strBuff = new StringBuilder(512);
// get clientId and check valid
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
+ final String clientId = (String) result.getRetData();
// check authority
- checkNodeStatus(clientId, strBuffer);
+ checkNodeStatus(clientId, strBuff);
// get optional filed
ClusterSettingEntity defSetting =
defMetaDataService.getClusterDefSetting(false);
@@ -1016,13 +993,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
request.getCurBrokerConfId(), request.getConfCheckSumId(),
true, request.getBrokerDefaultConfInfo(),
request.getBrokerTopicSetConfInfoList(), request.getBrokerOnline(),
- overtls, strBuffer, result)) {
+ overtls, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
// print broker register log
- logger.info(strBuffer.append("[Broker Register] ").append(clientId)
+ logger.info(strBuff.append("[Broker Register] ").append(clientId)
.append(" report, configureId=").append(request.getCurBrokerConfId())
.append(",readStatusRpt=").append(request.getReadStatusRpt())
.append(",writeStatusRpt=").append(request.getWriteStatusRpt())
@@ -1031,7 +1008,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",FlowCtrlId=").append(reFlowCtrlId)
.append(",qryPriorityId=").append(qryPriorityId)
.append(",checksumId=").append(request.getConfCheckSumId()).toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
// response
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1047,7 +1024,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
enableInfo.setEnableConsumeAuthenticate(masterConfig.isStartConsumeAuthenticate());
enableInfo.setEnableConsumeAuthorize(masterConfig.isStartConsumeAuthorize());
builder.setEnableBrokerInfo(enableInfo);
- brokerRunManager.setRegisterDownConfInfo(brokerInfo.getBrokerId(), strBuffer, builder);
+ brokerRunManager.setRegisterDownConfInfo(brokerInfo.getBrokerId(), strBuff, builder);
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
buildClusterConfig(request.getClsConfig());
@@ -1065,7 +1042,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
}
- logger.info(strBuffer.append("[Broker Register] ").append(clientId)
+ logger.info(strBuff.append("[Broker Register] ").append(clientId)
.append(", isOverTLS=").append(overtls).toString());
return builder.build();
}
@@ -1105,35 +1082,33 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return builder.build();
}
ProcessResult result = new ProcessResult();
- final StringBuilder strBuffer = new StringBuilder(512);
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final StringBuilder strBuff = new StringBuilder(512);
+ if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- int brokerId = (int) paramCheckResult.checkData;
+ int brokerId = (int) result.getRetData();
long reFlowCtrlId = request.hasFlowCheckId()
? request.getFlowCheckId()
: TBaseConstants.META_VALUE_UNDEFINED;
int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId()
: TBaseConstants.META_VALUE_UNDEFINED;
- checkNodeStatus(String.valueOf(brokerId), strBuffer);
+ checkNodeStatus(String.valueOf(brokerId), strBuff);
if (!brokerRunManager.brokerHeartBeat2M(brokerId,
request.getCurBrokerConfId(), request.getConfCheckSumId(),
request.getTakeConfInfo(), request.getBrokerDefaultConfInfo(),
request.getBrokerTopicSetConfInfoList(), request.getTakeRemovedTopicInfo(),
request.getRemovedTopicsInfoList(), request.getReadStatusRpt(),
request.getWriteStatusRpt(), request.getBrokerOnline(),
- strBuffer, result)) {
+ strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
}
if (request.getTakeConfInfo()) {
- strBuffer.append("[Broker Report] heartbeat report: brokerId=")
+ strBuff.append("[Broker Report] heartbeat report: brokerId=")
.append(request.getBrokerId()).append(", configureId=")
.append(request.getCurBrokerConfId())
.append(",readStatusRpt=").append(request.getReadStatusRpt())
@@ -1145,15 +1120,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",brokerOnline=").append(request.getBrokerOnline())
.append(",default broker configure is ").append(request.getBrokerDefaultConfInfo())
.append(",broker topic configure is ").append(request.getBrokerTopicSetConfInfoList());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
}
// create response
- brokerRunManager.setHeatBeatDownConfInfo(brokerId, strBuffer, builder);
+ brokerRunManager.setHeatBeatDownConfInfo(brokerId, strBuff, builder);
BrokerConfEntity brokerConfEntity =
defMetaDataService.getBrokerConfByBrokerId(brokerId);
builder.setTakeRemoveTopicInfo(true);
builder.addAllRemoveTopicConfInfo(defMetaDataService
- .getBrokerRemovedTopicStrConfigInfo(brokerConfEntity, strBuffer).values());
+ .getBrokerRemovedTopicStrConfigInfo(brokerConfEntity, strBuff).values());
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
if (request.hasFlowCheckId()) {
ClusterSettingEntity defSetting =
@@ -1197,7 +1172,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
final String rmtAddress,
boolean overtls) throws Throwable {
ProcessResult result = new ProcessResult();
- StringBuilder strBuffer = new StringBuilder(512);
+ StringBuilder strBuff = new StringBuilder(512);
CloseResponseM2B.Builder builder = CloseResponseM2B.newBuilder();
builder.setSuccess(false);
CertifiedResult cfResult =
@@ -1207,16 +1182,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(cfResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final int brokerId = (int) paramCheckResult.checkData;
- checkNodeStatus(String.valueOf(brokerId), strBuffer);
- if (!brokerRunManager.brokerClose2M(brokerId, strBuffer, result)) {
+ final int brokerId = (int) result.getRetData();
+ checkNodeStatus(String.valueOf(brokerId), strBuff);
+ if (!brokerRunManager.brokerClose2M(brokerId, strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -1241,7 +1214,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
String rmtAddress,
boolean overtls) throws Throwable {
ProcessResult result = new ProcessResult();
- final StringBuilder sBuffer = new StringBuilder(512);
+ final StringBuilder strBuff = new StringBuilder(512);
RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder();
CertifiedResult certResult =
serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
@@ -1250,32 +1223,28 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), sBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String consumerId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), sBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String consumerId = (String) result.getRetData();
+ if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- // final String hostName = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), sBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // final String hostName = (String) result.getRetData();
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
+ final String groupName = (String) result.getRetData();
// check master current status
- checkNodeStatus(consumerId, sBuffer);
+ checkNodeStatus(consumerId, strBuff);
if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
- request.getTopicListList(), result, sBuffer)) {
+ request.getTopicListList(), strBuff, result)) {
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
return builder.build();
@@ -1313,9 +1282,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
// need removed for authorize center begin
if (!this.defMetaDataService
.isConsumeTargetAuthorized(consumerId, groupName,
- reqTopicSet, reqTopicConditions, sBuffer, result)) {
- if (sBuffer.length() > 0) {
- logger.warn(sBuffer.toString());
+ reqTopicSet, reqTopicConditions, strBuff, result)) {
+ if (strBuff.length() > 0) {
+ logger.warn(strBuff.toString());
}
builder.setErrCode(result.getErrCode());
builder.setErrMsg(result.getErrMsg());
@@ -1323,12 +1292,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
// need removed for authorize center end
// check resource require
- paramCheckResult =
- PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
- masterConfig, defMetaDataService, brokerRunManager, sBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
+ masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
CertifiedResult authorizeResult =
@@ -1343,9 +1310,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
try {
lid = masterRowLock.getLock(null,
StringUtils.getBytesUtf8(consumerId), true);
- if (!consumerHolder.addConsumer(inConsumerInfo, false, sBuffer, paramCheckResult)) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!consumerHolder.addConsumer(inConsumerInfo, false, strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
@@ -1360,21 +1327,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
ConsumeGroupInfo consumeGroupInfo =
consumerHolder.getConsumeGroupInfo(groupName);
if (consumeGroupInfo == null) {
- logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId)
+ logger.warn(strBuff.append("[Illegal Process] ").append(consumerId)
.append(" visit consume group(").append(groupName)
.append(" info failure, null information").toString());
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
- builder.setErrMsg(sBuffer.toString());
- sBuffer.delete(0, sBuffer.length());
+ builder.setErrMsg(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
return builder.build();
}
inConsumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
if (inConsumerInfo == null) {
- logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId)
+ logger.warn(strBuff.append("[Illegal Process] ").append(consumerId)
.append(" visit consume info failure, null information").toString());
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
- builder.setErrMsg(sBuffer.toString());
- sBuffer.delete(0, sBuffer.length());
+ builder.setErrMsg(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
return builder.build();
}
Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
@@ -1386,12 +1353,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
topicPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
partMap.put(info.getPartitionKey(), info);
}
- printReportInfo(consumerId, null, topicPartSubMap, sBuffer);
+ printReportInfo(consumerId, null, topicPartSubMap, strBuff);
}
- logger.info(sBuffer.append("[Consumer Register] ")
+ logger.info(strBuff.append("[Consumer Register] ")
.append(consumerId).append(", isOverTLS=").append(overtls)
.append(", clientJDKVer=").append(clientJdkVer).toString());
- sBuffer.delete(0, sBuffer.length());
+ strBuff.delete(0, strBuff.length());
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
brokerRunManager.getBrokerStaticInfo(overtls);
builder.setBrokerConfigId(brokerStaticInfo.getF0());
@@ -1418,7 +1385,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public HeartResponseM2CV2 consumerHeartbeatC2MV2(HeartRequestC2MV2 request,
String rmtAddress,
boolean overtls) throws Throwable {
- final StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult result = new ProcessResult();
+ final StringBuilder strBuff = new StringBuilder(512);
// response
HeartResponseM2CV2.Builder builder = HeartResponseM2CV2.newBuilder();
// identity valid
@@ -1429,27 +1397,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.getRetData();
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
+ builder.setErrCode(result.getErrCode());
+ builder.setErrMsg(result.getErrMsg());
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
+ final String groupName = (String) result.getRetData();
OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
if (request.hasOpsTaskInfo()) {
opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
}
// check master current status
- checkNodeStatus(clientId, strBuffer);
+ checkNodeStatus(clientId, strBuff);
ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
if (request.hasSubRepInfo()) {
clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
@@ -1457,7 +1422,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
if (consumeGroupInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer.append("Not found groupName ")
+ builder.setErrMsg(strBuff.append("Not found groupName ")
.append(groupName).append(" in holder!").toString());
return builder.build();
}
@@ -1465,9 +1430,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
consumeGroupInfo.getConsumerInfo(clientId);
if (inConsumerInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer.append("Not found client ").append(clientId)
+ builder.setErrMsg(strBuff.append("Not found client ").append(clientId)
.append(" in group(").append(groupName).append(")").toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
return builder.build();
}
// authorize check
@@ -1485,7 +1450,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
} catch (HeartbeatException e) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer
+ builder.setErrMsg(strBuff
.append("Update consumer node exception:")
.append(e.getMessage()).toString());
return builder.build();
@@ -1502,7 +1467,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
newPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
partMap.put(info.getPartitionKey(), info);
}
- printReportInfo(clientId, curPartSubMap, newPartSubMap, strBuffer);
+ printReportInfo(clientId, curPartSubMap, newPartSubMap, strBuff);
currentSubInfo.put(clientId, newPartSubMap);
}
Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
@@ -1542,7 +1507,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
public GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(GetPartMetaRequestC2M request,
String rmtAddress,
boolean overtls) throws Throwable {
- StringBuilder strBuffer = new StringBuilder(512);
+ ProcessResult reslut = new ProcessResult();
+ StringBuilder strBuff = new StringBuilder(512);
GetPartMetaResponseM2C.Builder builder = GetPartMetaResponseM2C.newBuilder();
CertifiedResult certResult =
serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false);
@@ -1551,41 +1517,38 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, reslut)) {
+ builder.setErrCode(reslut.getErrCode());
+ builder.setErrMsg(reslut.getErrMsg());
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) reslut.getRetData();
+ if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, reslut)) {
+ builder.setErrCode(reslut.getErrCode());
+ builder.setErrMsg(reslut.getErrMsg());
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
+ final String groupName = (String) reslut.getRetData();
final long brokerConfigId = request.getBrokerConfigId();
final long topicMetaInfoId = request.getTopicMetaInfoId();
- checkNodeStatus(clientId, strBuffer);
+ checkNodeStatus(clientId, strBuff);
// get control object
ConsumeGroupInfo consumeGroupInfo =
consumerHolder.getConsumeGroupInfo(groupName);
if (consumeGroupInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer.append("Not found groupName ")
+ builder.setErrMsg(strBuff.append("Not found groupName ")
.append(groupName).append(" in holder!").toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
return builder.build();
}
ConsumerInfo inConsumerInfo =
consumeGroupInfo.getConsumerInfo(clientId);
if (inConsumerInfo == null) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer.append("Not found client ").append(clientId)
+ builder.setErrMsg(strBuff.append("Not found client ").append(clientId)
.append(" in group(").append(groupName).append(")").toString());
- strBuffer.delete(0, strBuffer.length());
+ strBuff.delete(0, strBuff.length());
return builder.build();
}
// heartbeat check
@@ -1593,14 +1556,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
} catch (HeartbeatException e) {
builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
- builder.setErrMsg(strBuffer
+ builder.setErrMsg(strBuff
.append("Update consumer node exception:")
.append(e.getMessage()).toString());
return builder.build();
}
Tuple2<Long, List<String>> topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
if (topicMetaInfoTuple.getF0() == TBaseConstants.META_VALUE_UNDEFINED) {
- freshTopicMetaInfo(consumeGroupInfo, strBuffer);
+ freshTopicMetaInfo(consumeGroupInfo, strBuff);
topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo();
}
builder.setTopicMetaInfoId(topicMetaInfoTuple.getF0());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index f82cd3eaf..58ccb2e35 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -33,9 +33,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
-import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,13 +113,12 @@ public class ConsumeGroupInfo {
* Add consumer to consume group
*
* @param inConsumer consumer object
- * @param sBuffer the string buffer
+ * @param strBuff the string buffer
* @param result the process result
* @return whether the addition is successful
*/
public boolean addConsumer(ConsumerInfo inConsumer,
- StringBuilder sBuffer,
- ParamCheckResult result) {
+ StringBuilder strBuff, ProcessResult result) {
try {
csmInfoRWLock.writeLock().lock();
if (this.consumerInfoMap.isEmpty()) {
@@ -133,14 +132,14 @@ public class ConsumeGroupInfo {
this.sourceCount = inConsumer.getSourceCount();
}
} else {
- if (!validConsumerInfo(inConsumer, sBuffer, result)) {
+ if (!validConsumerInfo(inConsumer, strBuff, result)) {
return false;
}
ConsumerInfo curConsumerInfo =
consumerInfoMap.get(inConsumer.getConsumerId());
if (curConsumerInfo != null) {
curConsumerInfo.updCurConsumerInfo(inConsumer);
- result.setCheckData(false);
+ result.setSuccResult(false);
return true;
}
}
@@ -148,7 +147,7 @@ public class ConsumeGroupInfo {
if (consumeType == ConsumeType.CONSUME_BAND) {
bookPartitionInfo(inConsumer);
}
- result.setCheckData(true);
+ result.setSuccResult(true);
return true;
} finally {
csmInfoRWLock.writeLock().unlock();
@@ -620,38 +619,37 @@ public class ConsumeGroupInfo {
* Check the validity of consumer's parameters
*
* @param inConsumer consumer info
- * @param sBuffer string buffer
+ * @param strBuff string buffer
* @param result process result
* @return true if valid, or false if invalid
*/
private boolean validConsumerInfo(ConsumerInfo inConsumer,
- StringBuilder sBuffer,
- ParamCheckResult result) {
+ StringBuilder strBuff, ProcessResult result) {
// check whether the consumer behavior is consistent
if (inConsumer.getConsumeType() != this.consumeType) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" using ").append(inConsumer.getConsumeType().getName())
.append(" subscribe is inconsistency with other consumers using ")
.append(this.consumeType.getName())
.append(" subscribe in the group");
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, sBuffer.toString());
- logger.warn(sBuffer.toString());
- sBuffer.delete(0, sBuffer.length());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, strBuff.toString());
+ logger.warn(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
return false;
}
// check the topics of consumption
if (CollectionUtils.isNotEmpty(topicSet)
&& (topicSet.size() != inConsumer.getTopicSet().size()
|| !topicSet.containsAll(inConsumer.getTopicSet()))) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribed topics ").append(inConsumer.getTopicSet())
.append(" is inconsistency with other consumers in the group, existedTopics: ")
.append(topicSet);
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, sBuffer.toString());
- logger.warn(sBuffer.toString());
- sBuffer.delete(0, sBuffer.length());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, strBuff.toString());
+ logger.warn(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
return false;
}
// check the topic conditions of consumption
@@ -659,7 +657,7 @@ public class ConsumeGroupInfo {
if (topicConditions.isEmpty()) {
if (!inConsumer.getTopicConditions().isEmpty()) {
isCondEqual = false;
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribe with filter condition ")
.append(inConsumer.getTopicConditions())
.append(" is inconsistency with other consumers in the group: topic without conditions");
@@ -668,7 +666,7 @@ public class ConsumeGroupInfo {
// check the filter conditions of the topic
if (inConsumer.getTopicConditions().isEmpty()) {
isCondEqual = false;
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribe without filter condition ")
.append(" is inconsistency with other consumers in the group, existed topic conditions is ")
.append(topicConditions);
@@ -678,7 +676,7 @@ public class ConsumeGroupInfo {
if (existedCondTopics.size() != reqCondTopics.size()
|| !existedCondTopics.containsAll(reqCondTopics)) {
isCondEqual = false;
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribe with filter condition ")
.append(inConsumer.getTopicConditions())
.append(" is inconsistency with other consumers in the group, existed topic conditions is ")
@@ -690,7 +688,7 @@ public class ConsumeGroupInfo {
|| (!topicConditions.get(topicKey).containsAll(
inConsumer.getTopicConditions().get(topicKey)))) {
isCondEqual = false;
- sBuffer.append("[Inconsistency subscribe] ")
+ strBuff.append("[Inconsistency subscribe] ")
.append(inConsumer.getConsumerId())
.append(" subscribe with filter condition ")
.append(inConsumer.getTopicConditions())
@@ -704,25 +702,25 @@ public class ConsumeGroupInfo {
}
}
if (!isCondEqual) {
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, sBuffer.toString());
- logger.warn(sBuffer.toString());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, strBuff.toString());
+ logger.warn(strBuff.toString());
return false;
}
// Check the validity of bound consumer's parameters
if (this.consumeType == ConsumeType.CONSUME_BAND) {
- if (!validBoundParameters(inConsumer, sBuffer, result)) {
+ if (!validBoundParameters(inConsumer, strBuff, result)) {
return false;
}
} else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
if (this.sourceCount > 0) {
if (this.sourceCount != inConsumer.getSourceCount()) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s sourceCount is inconsistency with other consumers in the group, required is ")
.append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString());
- logger.warn(sBuffer.toString());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, strBuff.toString());
+ logger.warn(strBuff.toString());
return false;
}
boolean foundOccupied = false;
@@ -741,19 +739,19 @@ public class ConsumeGroupInfo {
}
}
if (foundOccupied) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s nodeId value(").append(inConsumer.getNodeId())
.append(") is occupied by ").append(occupiedConsumerId)
.append("'s nodeId value(").append(occupiedNodeId)
.append(") in the group!");
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString());
- logger.warn(sBuffer.toString());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, strBuff.toString());
+ logger.warn(strBuff.toString());
return false;
}
}
}
- result.setCheckData("Ok");
+ result.setSuccResult("Ok");
return true;
}
@@ -761,50 +759,49 @@ public class ConsumeGroupInfo {
* Check the validity of bound consumer's parameters
*
* @param inConsumer consumer info
- * @param sBuffer string buffer
+ * @param strBuff string buffer
* @param result process result
* @return true if valid, or false if invalid
*/
private boolean validBoundParameters(ConsumerInfo inConsumer,
- StringBuilder sBuffer,
- ParamCheckResult result) {
+ StringBuilder strBuff, ProcessResult result) {
if (consumeType != ConsumeType.CONSUME_BAND) {
- result.setCheckData("");
+ result.setSuccResult("");
return true;
}
// If the sessionKey is inconsistent, it means that the previous round of consumption has not completely
// exited. In order to avoid the incomplete offset setting, it is necessary to completely clear the above
// data before resetting and consuming this round of consumption
if (!sessionKey.equals(inConsumer.getSessionKey())) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s sessionKey is inconsistency with other consumers in the group, required is ")
.append(sessionKey).append(", request is ").append(inConsumer.getSessionKey());
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_SESSIONKEY, sBuffer.toString());
- logger.warn(sBuffer.toString());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_SESSIONKEY, strBuff.toString());
+ logger.warn(strBuff.toString());
return false;
}
// check the offset config
if (isSelectedBig != inConsumer.isSelectedBig()) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s isSelectBig is inconsistency with other consumers in the group, required is ")
.append(isSelectedBig).append(", request is ").append(inConsumer.isSelectedBig());
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_SELECTBIG, sBuffer.toString());
- logger.warn(sBuffer.toString());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_SELECTBIG, strBuff.toString());
+ logger.warn(strBuff.toString());
return false;
}
// check the consumers count
if (sourceCount != inConsumer.getSourceCount()) {
- sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
+ strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s sourceCount is inconsistency with other consumers in the group, required is ")
.append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
- result.setCheckResult(false,
- TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString());
- logger.warn(sBuffer.toString());
+ result.setFailResult(
+ TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, strBuff.toString());
+ logger.warn(strBuff.toString());
return false;
}
- result.setCheckData("Ok");
+ result.setSuccResult("Ok");
return true;
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index db09ca02d..0f600bb94 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -24,9 +24,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.codec.binary.StringUtils;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
-import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.apache.inlong.tubemq.server.common.utils.RowLock;
import org.apache.inlong.tubemq.server.master.MasterConfig;
import org.apache.inlong.tubemq.server.master.TMaster;
@@ -347,12 +347,12 @@ public class ConsumerInfoHolder {
*
* @param consumer consumer info
* @param isNotAllocated whether balanced
- * @param sBuffer string buffer
+ * @param strBuff string buffer
* @param result check result
* @return process result
*/
public boolean addConsumer(ConsumerInfo consumer, boolean isNotAllocated,
- StringBuilder sBuffer, ParamCheckResult result) {
+ StringBuilder strBuff, ProcessResult result) {
ConsumeGroupInfo consumeGroupInfo;
String group = consumer.getGroupName();
Integer lid = null;
@@ -388,8 +388,8 @@ public class ConsumerInfoHolder {
consumeGroupInfo.isClientBalance());
}
}
- if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) {
- if ((Boolean) result.checkData) {
+ if (consumeGroupInfo.addConsumer(consumer, strBuff, result)) {
+ if ((Boolean) result.getRetData()) {
MasterSrvStatsHolder.incConsumerCnt(false,
consumeGroupInfo.isClientBalance());
}
@@ -397,7 +397,7 @@ public class ConsumerInfoHolder {
consumeGroupInfo.settAllocated();
}
consumerIndexMap.put(consumer.getConsumerId(), group);
- result.setCheckData(consumeGroupInfo);
+ result.setSuccResult(consumeGroupInfo);
}
} catch (IOException e) {
logger.warn("Failed to lock.", e);
@@ -406,7 +406,7 @@ public class ConsumerInfoHolder {
groupRowLock.releaseRowLock(lid);
}
}
- return result.result;
+ return result.isSuccess();
}
/**
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
index 103fd0f11..4ccfcc5b4 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java
@@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils;
-import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult;
import org.junit.Assert;
import org.junit.Test;
@@ -32,52 +31,54 @@ public class PBParameterTest {
@Test
public void checkProducerTopicTest() {
- ParamCheckResult result = PBParameterUtils.checkProducerTopicList(null, null);
- Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
+ ProcessResult result = new ProcessResult();
+ StringBuilder strBuff = new StringBuilder(128);
+ Assert.assertFalse(PBParameterUtils.checkProducerTopicList(null, strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
final List<String> topicList = new ArrayList<>();
topicList.add("test1");
- result = PBParameterUtils.checkProducerTopicList(topicList, new StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS);
+ Assert.assertTrue(PBParameterUtils.checkProducerTopicList(topicList, strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
for (int i = 0; i < 1025; i++) {
topicList.add("test" + i);
}
- result = PBParameterUtils.checkProducerTopicList(topicList, new StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
+ Assert.assertFalse(PBParameterUtils.checkProducerTopicList(topicList, strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
}
@Test
public void checkConsumerTopicTest() {
ProcessResult result = new ProcessResult();
- PBParameterUtils.checkConsumerTopicList(null, null, result, null);
+ StringBuilder strBuff = new StringBuilder(128);
+ PBParameterUtils.checkConsumerTopicList(null, null, strBuff, result);
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
final Set<String> depTopicList = new HashSet<>();
final List<String> reqTopicList = new ArrayList<>();
depTopicList.add("test1");
reqTopicList.add("test1");
- PBParameterUtils.checkConsumerTopicList(depTopicList,
- reqTopicList, result, new StringBuilder(128));
+ PBParameterUtils.checkConsumerTopicList(depTopicList, reqTopicList, strBuff, result);
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
reqTopicList.add("test2");
- PBParameterUtils.checkConsumerTopicList(depTopicList,
- reqTopicList, result, new StringBuilder(128));
+ PBParameterUtils.checkConsumerTopicList(depTopicList, reqTopicList, strBuff, result);
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.TOPIC_NOT_DEPLOYED);
for (int i = 0; i < 1025; i++) {
reqTopicList.add("test" + i);
}
- PBParameterUtils.checkConsumerTopicList(depTopicList,
- reqTopicList, result, new StringBuilder(128));
+ PBParameterUtils.checkConsumerTopicList(depTopicList, reqTopicList, strBuff, result);
Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
}
@Test
public void checkIdTest() {
- ParamCheckResult result = PBParameterUtils.checkClientId("100", new StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS);
- result = PBParameterUtils.checkClientId("", new StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
- result = PBParameterUtils.checkBrokerId("100", new StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS);
- result = PBParameterUtils.checkBrokerId("", new StringBuilder(128));
- Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST);
+ ProcessResult result = new ProcessResult();
+ StringBuilder strBuff = new StringBuilder(128);
+ Assert.assertTrue(PBParameterUtils.checkClientId("100", strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
+ Assert.assertFalse(PBParameterUtils.checkClientId("", strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
+ Assert.assertTrue(PBParameterUtils.checkBrokerId("100", strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS);
+ Assert.assertFalse(PBParameterUtils.checkBrokerId("", strBuff, result));
+ Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST);
}
}