You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/12 08:54:15 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-508] Optimize
Broker's PB parameter check processing logic (#392)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 1cca49a [TUBEMQ-508] Optimize Broker's PB parameter check processing logic (#392)
1cca49a is described below
commit 1cca49a1c5c28f04334fa822ef1733d701a7213e
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Jan 12 16:54:04 2021 +0800
[TUBEMQ-508] Optimize Broker's PB parameter check processing logic (#392)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq/server/broker/BrokerServiceServer.java | 184 +++++++++++----------
.../tubemq/server/common/fielddef/WebFieldDef.java | 6 +-
.../server/common/paramcheck/PBParameterUtils.java | 160 +++++++++---------
3 files changed, 181 insertions(+), 169 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 60490c6..c9cfba4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -69,13 +69,14 @@ import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.aaaserver.CertificateBrokerHandler;
import org.apache.tubemq.server.common.aaaserver.CertifiedResult;
import org.apache.tubemq.server.common.exception.HeartbeatException;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.heartbeat.HeartbeatManager;
import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
import org.apache.tubemq.server.common.paramcheck.PBParameterUtils;
-import org.apache.tubemq.server.common.paramcheck.ParamCheckResult;
import org.apache.tubemq.server.common.utils.AppendResult;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.RowLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,6 +272,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
public GetMessageResponseB2C getMessagesC2B(GetMessageRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ ProcessResult result = new ProcessResult();
+ StringBuilder strBuffer = new StringBuilder(512);
final GetMessageResponseB2C.Builder builder =
GetMessageResponseB2C.newBuilder();
builder.setSuccess(false);
@@ -278,39 +281,37 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
builder.setEscFlowCtrl(false);
builder.setCurrDataDlt(-1);
builder.setMinLimitTime(0);
- StringBuilder strBuffer = new StringBuilder(512);
if (!this.started.get()
|| ServiceStatusHolder.isReadServiceStop()) {
builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
builder.setErrMsg("Read StoreService temporary unavailable!");
return builder.build();
}
- // check request's parameters
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get and check clientId field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+ request.getClientId(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.retData1;
+ // get and check groupName field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+ request.getGroupName(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkConsumeTopicName(request.getTopicName(), this.metadataManager, strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String groupName = (String) result.retData1;
+ // get and check topicName field
+ if (!PBParameterUtils.getTopicNameParameter(request.getTopicName(),
+ this.metadataManager, strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
// get consumer info
- final String topicName = (String) paramCheckResult.checkData;
+ final String topicName = (String) result.retData1;
final int partitionId = request.getPartitionId();
boolean isEscFlowCtrl = request.hasEscFlowCtrl() && request.getEscFlowCtrl();
String partStr = getPartStr(groupName, topicName, partitionId);
@@ -587,6 +588,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ ProcessResult result = new ProcessResult();
final StringBuilder strBuffer = new StringBuilder(512);
SendMessageResponseB2P.Builder builder = SendMessageResponseB2P.newBuilder();
builder.setSuccess(false);
@@ -603,24 +605,23 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get and check clientId field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+ request.getClientId(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String producerId = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkExistTopicNameInfo(request.getTopicName(),
- request.getPartitionId(), this.metadataManager, strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String producerId = (String) result.retData1;
+ // get and check topicName and partitionId field
+ final int partitionId = request.getPartitionId();
+ if (!PBParameterUtils.getTopicNamePartIdInfo(request.getTopicName(),
+ partitionId, this.metadataManager, strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String reqTopic = (String) paramCheckResult.checkData;
- final int partition = request.getPartitionId();
+ final String topicName = (String) result.retData1;
String msgType = null;
int msgTypeCode = -1;
if (TStringUtils.isNotBlank(request.getMsgType())) {
@@ -645,14 +646,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
if (request.getCheckSum() != -1 && checkSum != request.getCheckSum()) {
builder.setErrCode(TErrCodeConstants.FORBIDDEN);
builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
- .append(request.getCheckSum()).append(" of ").append(reqTopic)
+ .append(request.getCheckSum()).append(" of ").append(topicName)
.append(" not equal to the data's checksum of ")
.append(checkSum).toString());
return builder.build();
}
CertifiedResult authorizeResult =
serverAuthHandler.validProduceAuthorizeInfo(
- certResult.userName, reqTopic, msgType, rmtAddress);
+ certResult.userName, topicName, msgType, rmtAddress);
if (!authorizeResult.result) {
builder.setErrCode(authorizeResult.errCode);
builder.setErrMsg(authorizeResult.errInfo);
@@ -660,11 +661,11 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
}
try {
final MessageStore store =
- this.storeManager.getOrCreateMessageStore(reqTopic, partition);
+ this.storeManager.getOrCreateMessageStore(topicName, partitionId);
final AppendResult appendResult = new AppendResult();
if (store.appendMsg(appendResult, dataLength, checkSum, msgData,
- msgTypeCode, request.getFlag(), partition, request.getSentAddr())) {
- String baseKey = strBuffer.append(reqTopic)
+ msgTypeCode, request.getFlag(), partitionId, request.getSentAddr())) {
+ String baseKey = strBuffer.append(topicName)
.append("#").append(AddressUtils.intToIp(request.getSentAddr()))
.append("#").append(tubeConfig.getHostName())
.append("#").append(request.getPartitionId())
@@ -712,6 +713,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
public RegisterResponseB2C consumerRegisterC2B(RegisterRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ ProcessResult result = new ProcessResult();
final StringBuilder strBuffer = new StringBuilder(512);
RegisterResponseB2C.Builder builder = RegisterResponseB2C.newBuilder();
builder.setSuccess(false);
@@ -727,28 +729,31 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult = PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get and check clientId field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+ request.getClientId(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult
- = PBParameterUtils.checkConsumeTopicName(request.getTopicName(), this.metadataManager, strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.retData1;
+ // get and check topicName field
+ if (!PBParameterUtils.getTopicNameParameter(request.getTopicName(),
+ this.metadataManager, strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String topicName = (String) paramCheckResult.checkData;
- paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get consumer info
+ final String topicName = (String) result.retData1;
+ // get and check groupName field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+ request.getGroupName(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
+ final String groupName = (String) result.retData1;
boolean isRegister = (request.getOpType() == RpcConstants.MSG_OPTYPE_REGISTER);
Set<String> filterCondSet = new HashSet<>();
if (request.getFilterCondStrList() != null && !request.getFilterCondStrList().isEmpty()) {
@@ -977,8 +982,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
public HeartBeatResponseB2C consumerHeartbeatC2B(HeartBeatRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
- final HeartBeatResponseB2C.Builder builder = HeartBeatResponseB2C.newBuilder();
+ ProcessResult result = new ProcessResult();
final StringBuilder strBuffer = new StringBuilder(512);
+ final HeartBeatResponseB2C.Builder builder = HeartBeatResponseB2C.newBuilder();
builder.setSuccess(false);
if (!this.started.get()) {
builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
@@ -992,22 +998,22 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
builder.setErrMsg(certResult.errInfo);
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get and check clientId field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+ request.getClientId(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.retData1;
+ // get and check groupName field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+ request.getGroupName(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
+ final String groupName = (String) result.retData1;
int reqQryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
List<Partition> partitions =
@@ -1097,8 +1103,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
public CommitOffsetResponseB2C consumerCommitC2B(CommitOffsetRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
- final CommitOffsetResponseB2C.Builder builder = CommitOffsetResponseB2C.newBuilder();
+ ProcessResult result = new ProcessResult();
StringBuilder strBuffer = new StringBuilder(512);
+ final CommitOffsetResponseB2C.Builder builder = CommitOffsetResponseB2C.newBuilder();
builder.setSuccess(false);
builder.setCurrOffset(-1);
if (!this.started.get()) {
@@ -1106,32 +1113,31 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
builder.setErrMsg("StoreService temporary unavailable!");
return builder.build();
}
- ParamCheckResult paramCheckResult =
- PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get and check clientId field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+ request.getClientId(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String clientId = (String) paramCheckResult.checkData;
- paramCheckResult =
- PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ final String clientId = (String) result.retData1;
+ // get and check groupName field
+ if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+ request.getGroupName(), strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String groupName = (String) paramCheckResult.checkData;
+ final String groupName = (String) result.retData1;
int partitionId = request.getPartitionId();
- paramCheckResult =
- PBParameterUtils.checkExistTopicNameInfo(
- request.getTopicName(), partitionId, this.metadataManager, strBuffer);
- if (!paramCheckResult.result) {
- builder.setErrCode(paramCheckResult.errCode);
- builder.setErrMsg(paramCheckResult.errMsg);
+ // get and check topicName and partitionId field
+ if (!PBParameterUtils.getTopicNamePartIdInfo(request.getTopicName(),
+ partitionId, this.metadataManager, strBuffer, result)) {
+ builder.setErrCode(result.errCode);
+ builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String topicName = (String) paramCheckResult.checkData;
+ final String topicName = (String) result.retData1;
String partStr = getPartStr(groupName, topicName, partitionId);
ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
if (consumerNodeInfo == null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 2ed75c3..68a833b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -93,7 +93,11 @@ public enum WebFieldDef {
CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
"Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING,
- "Record modification date", TBaseConstants.META_MAX_DATEVALUE_LENGTH);
+ "Record modification date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
+ HOSTNAME(27, "hostName", "hostName", WebFieldType.STRING,
+ "Host name information", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
+ CLIENTID(28, "clientId", "clientId", WebFieldType.STRING,
+ "Client ID", TBaseConstants.META_MAX_CLIENT_ID_LENGTH);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index ed2e122..1c2d5ca 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -42,6 +42,9 @@ import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
+
public class PBParameterUtils {
private static final Logger logger = LoggerFactory.getLogger(PBParameterUtils.class);
@@ -438,85 +441,6 @@ public class PBParameterUtils {
}
/**
- * Check the topic name.
- *
- * @param topicName the topic name to check
- * @param metadataManager the metadata manager which contains topic information
- * @param strBuffer the string buffer used to construct the check result
- * @return the check result
- */
- public static ParamCheckResult checkConsumeTopicName(final String topicName,
- final MetadataManager metadataManager,
- final StringBuilder strBuffer) {
- ParamCheckResult retResult = new ParamCheckResult();
- if (TStringUtils.isBlank(topicName)) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- "Request miss necessary topicName data!");
- return retResult;
- }
- String tmpValue = topicName.trim();
- if (metadataManager.getTopicMetadata(tmpValue) == null) {
- retResult.setCheckResult(false,
- TErrCodeConstants.FORBIDDEN,
- strBuffer.append("Topic ").append(tmpValue)
- .append(" not existed, please check your configure").toString());
- return retResult;
- }
- retResult.setCheckData(tmpValue);
- return retResult;
- }
-
- /**
- * Check the existing topic name info
- *
- * @param topicName the topic name to be checked.
- * @param partitionId the partition ID where the topic locates
- * @param metadataManager the metadata manager which contains topic information
- * @param strBuffer the string buffer used to construct the check result
- * @return the check result
- */
- public static ParamCheckResult checkExistTopicNameInfo(final String topicName,
- final int partitionId,
- final MetadataManager metadataManager,
- final StringBuilder strBuffer) {
- ParamCheckResult retResult = new ParamCheckResult();
- if (TStringUtils.isBlank(topicName)) {
- retResult.setCheckResult(false,
- TErrCodeConstants.BAD_REQUEST,
- "Request miss necessary topicName data!");
- return retResult;
- }
- String tmpValue = topicName.trim();
- TopicMetadata topicMetadata = metadataManager.getTopicMetadata(tmpValue);
- if (topicMetadata == null) {
- retResult.setCheckResult(false,
- TErrCodeConstants.FORBIDDEN,
- strBuffer.append("Topic ").append(tmpValue)
- .append(" not existed, please check your configure").toString());
- return retResult;
- }
- if (metadataManager.isClosedTopic(tmpValue)) {
- retResult.setCheckResult(false,
- TErrCodeConstants.FORBIDDEN,
- strBuffer.append("Topic ").append(tmpValue).append(" has been closed").toString());
- return retResult;
- }
- int realPartition = partitionId < TBaseConstants.META_STORE_INS_BASE
- ? partitionId : partitionId % TBaseConstants.META_STORE_INS_BASE;
- if ((realPartition < 0) || (realPartition >= topicMetadata.getNumPartitions())) {
- retResult.setCheckResult(false,
- TErrCodeConstants.FORBIDDEN,
- strBuffer.append("Partition ")
- .append(tmpValue).append("-").append(partitionId)
- .append(" not existed, please check your configure").toString());
- return retResult;
- }
- retResult.setCheckData(tmpValue);
- return retResult;
- }
-
- /**
* Check the clientID.
*
* @param clientId the client id to be checked
@@ -609,4 +533,82 @@ public class PBParameterUtils {
result.setSuccResult(tmpValue);
return result.success;
}
+
+ /**
+ * Check the topic name.
+ *
+ * @param topicName the topic name to check
+ * @param metadataManager the metadata manager which contains topic information
+ * @param strBuffer the string buffer used to construct the check result
+ * @param result the checked result
+ * @return the check result
+ */
+ public static boolean getTopicNameParameter(String topicName,
+ MetadataManager metadataManager,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ if (!getStringParameter(WebFieldDef.TOPICNAME,
+ topicName, strBuffer, result)) {
+ return result.success;
+ }
+ String tmpValue = (String) result.retData1;
+ if (metadataManager.getTopicMetadata(tmpValue) == null) {
+ result.setFailResult(TErrCodeConstants.FORBIDDEN,
+ strBuffer.append(WebFieldDef.TOPICNAME.name)
+ .append(" ").append(tmpValue)
+ .append(" not existed, please check your configure").toString());
+ strBuffer.delete(0, strBuffer.length());
+ }
+ return result.success;
+ }
+
+ /**
+ * Check the existing topic name info
+ *
+ * @param topicName the topic name to be checked.
+ * @param partitionId the partition ID where the topic locates
+ * @param metadataManager the metadata manager which contains topic information
+ * @param strBuffer the string buffer used to construct the check result
+ * @param result the checked result
+ * @return the check result
+ */
+ public static boolean getTopicNamePartIdInfo(String topicName, int partitionId,
+ MetadataManager metadataManager,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ if (!getStringParameter(WebFieldDef.TOPICNAME,
+ topicName, strBuffer, result)) {
+ return result.success;
+ }
+ String tmpValue = (String) result.retData1;
+ TopicMetadata topicMetadata = metadataManager.getTopicMetadata(tmpValue);
+ if (topicMetadata == null) {
+ result.setFailResult(TErrCodeConstants.FORBIDDEN,
+ strBuffer.append(WebFieldDef.TOPICNAME.name)
+ .append(" ").append(tmpValue)
+ .append(" not existed, please check your configure").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return result.success;
+ }
+ if (metadataManager.isClosedTopic(tmpValue)) {
+ result.setFailResult(TErrCodeConstants.FORBIDDEN,
+ strBuffer.append(WebFieldDef.TOPICNAME.name)
+ .append(" ").append(tmpValue)
+ .append(" has been closed").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return result.success;
+ }
+ int realPartition = partitionId < TBaseConstants.META_STORE_INS_BASE
+ ? partitionId : partitionId % TBaseConstants.META_STORE_INS_BASE;
+ if ((realPartition < 0) || (realPartition >= topicMetadata.getNumPartitions())) {
+ result.setFailResult(TErrCodeConstants.FORBIDDEN,
+ strBuffer.append(WebFieldDef.PARTITIONID.name)
+ .append(" ").append(tmpValue).append("-").append(partitionId)
+ .append(" not existed, please check your configure").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return result.success;
+ }
+ result.setSuccResult(tmpValue);
+ return result.success;
+ }
}