You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/12 08:54:15 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-508] Optimize Broker's PB parameter check processing logic (#392)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cca49a  [TUBEMQ-508] Optimize Broker's PB parameter check processing logic (#392)
1cca49a is described below

commit 1cca49a1c5c28f04334fa822ef1733d701a7213e
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Jan 12 16:54:04 2021 +0800

    [TUBEMQ-508] Optimize Broker's PB parameter check processing logic (#392)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq/server/broker/BrokerServiceServer.java  | 184 +++++++++++----------
 .../tubemq/server/common/fielddef/WebFieldDef.java |   6 +-
 .../server/common/paramcheck/PBParameterUtils.java | 160 +++++++++---------
 3 files changed, 181 insertions(+), 169 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 60490c6..c9cfba4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -69,13 +69,14 @@ import org.apache.tubemq.server.common.TStatusConstants;
 import org.apache.tubemq.server.common.aaaserver.CertificateBrokerHandler;
 import org.apache.tubemq.server.common.aaaserver.CertifiedResult;
 import org.apache.tubemq.server.common.exception.HeartbeatException;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.tubemq.server.common.heartbeat.HeartbeatManager;
 import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
 import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
 import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
 import org.apache.tubemq.server.common.paramcheck.PBParameterUtils;
-import org.apache.tubemq.server.common.paramcheck.ParamCheckResult;
 import org.apache.tubemq.server.common.utils.AppendResult;
+import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.RowLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -271,6 +272,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     public GetMessageResponseB2C getMessagesC2B(GetMessageRequestC2B request,
                                                 final String rmtAddress,
                                                 boolean overtls) throws Throwable {
+        ProcessResult result = new ProcessResult();
+        StringBuilder strBuffer = new StringBuilder(512);
         final GetMessageResponseB2C.Builder builder =
                 GetMessageResponseB2C.newBuilder();
         builder.setSuccess(false);
@@ -278,39 +281,37 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         builder.setEscFlowCtrl(false);
         builder.setCurrDataDlt(-1);
         builder.setMinLimitTime(0);
-        StringBuilder strBuffer = new StringBuilder(512);
         if (!this.started.get()
                 || ServiceStatusHolder.isReadServiceStop()) {
             builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
             builder.setErrMsg("Read StoreService temporary unavailable!");
             return builder.build();
         }
-        // check request's parameters
-        ParamCheckResult paramCheckResult =
-                PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get and check clientId field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+                request.getClientId(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String clientId = (String) paramCheckResult.checkData;
-        paramCheckResult =
-                PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        final String clientId = (String) result.retData1;
+        // get and check groupName field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+                request.getGroupName(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String groupName = (String) paramCheckResult.checkData;
-        paramCheckResult =
-                PBParameterUtils.checkConsumeTopicName(request.getTopicName(), this.metadataManager, strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        final String groupName = (String) result.retData1;
+        // get and check topicName field
+        if (!PBParameterUtils.getTopicNameParameter(request.getTopicName(),
+                this.metadataManager, strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
         // get consumer info
-        final String topicName = (String) paramCheckResult.checkData;
+        final String topicName = (String) result.retData1;
         final int partitionId = request.getPartitionId();
         boolean isEscFlowCtrl = request.hasEscFlowCtrl() && request.getEscFlowCtrl();
         String partStr = getPartStr(groupName, topicName, partitionId);
@@ -587,6 +588,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request,
                                                  final String rmtAddress,
                                                  boolean overtls) throws Throwable {
+        ProcessResult result = new ProcessResult();
         final StringBuilder strBuffer = new StringBuilder(512);
         SendMessageResponseB2P.Builder builder = SendMessageResponseB2P.newBuilder();
         builder.setSuccess(false);
@@ -603,24 +605,23 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             builder.setErrMsg(certResult.errInfo);
             return builder.build();
         }
-        ParamCheckResult paramCheckResult =
-                PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get and check clientId field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+                request.getClientId(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String producerId = (String) paramCheckResult.checkData;
-        paramCheckResult =
-                PBParameterUtils.checkExistTopicNameInfo(request.getTopicName(),
-                        request.getPartitionId(), this.metadataManager, strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        final String producerId = (String) result.retData1;
+        // get and check topicName and partitionId field
+        final int partitionId = request.getPartitionId();
+        if (!PBParameterUtils.getTopicNamePartIdInfo(request.getTopicName(),
+                partitionId, this.metadataManager, strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String reqTopic = (String) paramCheckResult.checkData;
-        final int partition = request.getPartitionId();
+        final String topicName = (String) result.retData1;
         String msgType = null;
         int msgTypeCode = -1;
         if (TStringUtils.isNotBlank(request.getMsgType())) {
@@ -645,14 +646,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         if (request.getCheckSum() != -1 && checkSum != request.getCheckSum()) {
             builder.setErrCode(TErrCodeConstants.FORBIDDEN);
             builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
-                    .append(request.getCheckSum()).append(" of ").append(reqTopic)
+                    .append(request.getCheckSum()).append(" of ").append(topicName)
                     .append(" not equal to the data's checksum of ")
                     .append(checkSum).toString());
             return builder.build();
         }
         CertifiedResult authorizeResult =
                 serverAuthHandler.validProduceAuthorizeInfo(
-                        certResult.userName, reqTopic, msgType, rmtAddress);
+                        certResult.userName, topicName, msgType, rmtAddress);
         if (!authorizeResult.result) {
             builder.setErrCode(authorizeResult.errCode);
             builder.setErrMsg(authorizeResult.errInfo);
@@ -660,11 +661,11 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         }
         try {
             final MessageStore store =
-                    this.storeManager.getOrCreateMessageStore(reqTopic, partition);
+                    this.storeManager.getOrCreateMessageStore(topicName, partitionId);
             final AppendResult appendResult = new AppendResult();
             if (store.appendMsg(appendResult, dataLength, checkSum, msgData,
-                    msgTypeCode, request.getFlag(), partition, request.getSentAddr())) {
-                String baseKey = strBuffer.append(reqTopic)
+                    msgTypeCode, request.getFlag(), partitionId, request.getSentAddr())) {
+                String baseKey = strBuffer.append(topicName)
                         .append("#").append(AddressUtils.intToIp(request.getSentAddr()))
                         .append("#").append(tubeConfig.getHostName())
                         .append("#").append(request.getPartitionId())
@@ -712,6 +713,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     public RegisterResponseB2C consumerRegisterC2B(RegisterRequestC2B request,
                                                    final String rmtAddress,
                                                    boolean overtls) throws Throwable {
+        ProcessResult result = new ProcessResult();
         final StringBuilder strBuffer = new StringBuilder(512);
         RegisterResponseB2C.Builder builder = RegisterResponseB2C.newBuilder();
         builder.setSuccess(false);
@@ -727,28 +729,31 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             builder.setErrMsg(certResult.errInfo);
             return builder.build();
         }
-        ParamCheckResult paramCheckResult = PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get and check clientId field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+                request.getClientId(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String clientId = (String) paramCheckResult.checkData;
-        paramCheckResult
-                = PBParameterUtils.checkConsumeTopicName(request.getTopicName(), this.metadataManager, strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        final String clientId = (String) result.retData1;
+        // get and check topicName field
+        if (!PBParameterUtils.getTopicNameParameter(request.getTopicName(),
+                this.metadataManager, strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String topicName = (String) paramCheckResult.checkData;
-        paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get consumer info
+        final String topicName = (String) result.retData1;
+        // get and check groupName field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+                request.getGroupName(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String groupName = (String) paramCheckResult.checkData;
+        final String groupName = (String) result.retData1;
         boolean isRegister = (request.getOpType() == RpcConstants.MSG_OPTYPE_REGISTER);
         Set<String> filterCondSet = new HashSet<>();
         if (request.getFilterCondStrList() != null && !request.getFilterCondStrList().isEmpty()) {
@@ -977,8 +982,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     public HeartBeatResponseB2C consumerHeartbeatC2B(HeartBeatRequestC2B request,
                                                      final String rmtAddress,
                                                      boolean overtls) throws Throwable {
-        final HeartBeatResponseB2C.Builder builder = HeartBeatResponseB2C.newBuilder();
+        ProcessResult result = new ProcessResult();
         final StringBuilder strBuffer = new StringBuilder(512);
+        final HeartBeatResponseB2C.Builder builder = HeartBeatResponseB2C.newBuilder();
         builder.setSuccess(false);
         if (!this.started.get()) {
             builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
@@ -992,22 +998,22 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             builder.setErrMsg(certResult.errInfo);
             return builder.build();
         }
-        ParamCheckResult paramCheckResult =
-                PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get and check clientId field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+                request.getClientId(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String clientId = (String) paramCheckResult.checkData;
-        paramCheckResult =
-                PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        final String clientId = (String) result.retData1;
+        // get and check groupName field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+                request.getGroupName(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String groupName = (String) paramCheckResult.checkData;
+        final String groupName = (String) result.retData1;
         int reqQryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
         List<Partition> partitions =
@@ -1097,8 +1103,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     public CommitOffsetResponseB2C consumerCommitC2B(CommitOffsetRequestC2B request,
                                                      final String rmtAddress,
                                                      boolean overtls) throws Throwable {
-        final CommitOffsetResponseB2C.Builder builder = CommitOffsetResponseB2C.newBuilder();
+        ProcessResult result = new ProcessResult();
         StringBuilder strBuffer = new StringBuilder(512);
+        final CommitOffsetResponseB2C.Builder builder = CommitOffsetResponseB2C.newBuilder();
         builder.setSuccess(false);
         builder.setCurrOffset(-1);
         if (!this.started.get()) {
@@ -1106,32 +1113,31 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             builder.setErrMsg("StoreService temporary unavailable!");
             return builder.build();
         }
-        ParamCheckResult paramCheckResult =
-                PBParameterUtils.checkClientId(request.getClientId(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get and check clientId field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID,
+                request.getClientId(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String clientId = (String) paramCheckResult.checkData;
-        paramCheckResult =
-                PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        final String clientId = (String) result.retData1;
+        // get and check groupName field
+        if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME,
+                request.getGroupName(), strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String groupName = (String) paramCheckResult.checkData;
+        final String groupName = (String) result.retData1;
         int partitionId = request.getPartitionId();
-        paramCheckResult =
-                PBParameterUtils.checkExistTopicNameInfo(
-                        request.getTopicName(), partitionId, this.metadataManager, strBuffer);
-        if (!paramCheckResult.result) {
-            builder.setErrCode(paramCheckResult.errCode);
-            builder.setErrMsg(paramCheckResult.errMsg);
+        // get and check topicName and partitionId field
+        if (!PBParameterUtils.getTopicNamePartIdInfo(request.getTopicName(),
+                partitionId, this.metadataManager, strBuffer, result)) {
+            builder.setErrCode(result.errCode);
+            builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String topicName = (String) paramCheckResult.checkData;
+        final String topicName = (String) result.retData1;
         String partStr = getPartStr(groupName, topicName, partitionId);
         ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
         if (consumerNodeInfo == null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 2ed75c3..68a833b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -93,7 +93,11 @@ public enum WebFieldDef {
     CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
             "Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
     MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING,
-            "Record modification date", TBaseConstants.META_MAX_DATEVALUE_LENGTH);
+            "Record modification date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
+    HOSTNAME(27, "hostName", "hostName", WebFieldType.STRING,
+            "Host name information", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
+    CLIENTID(28, "clientId", "clientId", WebFieldType.STRING,
+            "Client ID", TBaseConstants.META_MAX_CLIENT_ID_LENGTH);
 
 
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index ed2e122..1c2d5ca 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -42,6 +42,9 @@ import org.apache.tubemq.server.master.nodemanage.nodeconsumer.ConsumerBandInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
+
 public class PBParameterUtils {
 
     private static final Logger logger = LoggerFactory.getLogger(PBParameterUtils.class);
@@ -438,85 +441,6 @@ public class PBParameterUtils {
     }
 
     /**
-     * Check the topic name.
-     *
-     * @param topicName      the topic name to check
-     * @param metadataManager the metadata manager which contains topic information
-     * @param strBuffer      the string buffer used to construct the check result
-     * @return the check result
-     */
-    public static ParamCheckResult checkConsumeTopicName(final String topicName,
-                                                         final MetadataManager metadataManager,
-                                                         final StringBuilder strBuffer) {
-        ParamCheckResult retResult = new ParamCheckResult();
-        if (TStringUtils.isBlank(topicName)) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.BAD_REQUEST,
-                    "Request miss necessary topicName data!");
-            return retResult;
-        }
-        String tmpValue = topicName.trim();
-        if (metadataManager.getTopicMetadata(tmpValue) == null) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.FORBIDDEN,
-                    strBuffer.append("Topic ").append(tmpValue)
-                            .append(" not existed, please check your configure").toString());
-            return retResult;
-        }
-        retResult.setCheckData(tmpValue);
-        return retResult;
-    }
-
-    /**
-     * Check the existing topic name info
-     *
-     * @param topicName      the topic name to be checked.
-     * @param partitionId    the partition ID where the topic locates
-     * @param metadataManager the metadata manager which contains topic information
-     * @param strBuffer      the string buffer used to construct the check result
-     * @return the check result
-     */
-    public static ParamCheckResult checkExistTopicNameInfo(final String topicName,
-                                                           final int partitionId,
-                                                           final MetadataManager metadataManager,
-                                                           final StringBuilder strBuffer) {
-        ParamCheckResult retResult = new ParamCheckResult();
-        if (TStringUtils.isBlank(topicName)) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.BAD_REQUEST,
-                    "Request miss necessary topicName data!");
-            return retResult;
-        }
-        String tmpValue = topicName.trim();
-        TopicMetadata topicMetadata = metadataManager.getTopicMetadata(tmpValue);
-        if (topicMetadata == null) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.FORBIDDEN,
-                    strBuffer.append("Topic ").append(tmpValue)
-                            .append(" not existed, please check your configure").toString());
-            return retResult;
-        }
-        if (metadataManager.isClosedTopic(tmpValue)) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.FORBIDDEN,
-                    strBuffer.append("Topic ").append(tmpValue).append(" has been closed").toString());
-            return retResult;
-        }
-        int realPartition = partitionId < TBaseConstants.META_STORE_INS_BASE
-                ? partitionId : partitionId % TBaseConstants.META_STORE_INS_BASE;
-        if ((realPartition < 0) || (realPartition >= topicMetadata.getNumPartitions())) {
-            retResult.setCheckResult(false,
-                    TErrCodeConstants.FORBIDDEN,
-                    strBuffer.append("Partition ")
-                            .append(tmpValue).append("-").append(partitionId)
-                            .append(" not existed, please check your configure").toString());
-            return retResult;
-        }
-        retResult.setCheckData(tmpValue);
-        return retResult;
-    }
-
-    /**
      * Check the clientID.
      *
      * @param clientId  the client id to be checked
@@ -609,4 +533,82 @@ public class PBParameterUtils {
         result.setSuccResult(tmpValue);
         return result.success;
     }
+
+    /**
+     * Check the topic name.
+     *
+     * @param topicName      the topic name to check
+     * @param metadataManager the metadata manager which contains topic information
+     * @param strBuffer      the string buffer used to construct the check result
+     * @param result         the checked result
+     * @return the check result
+     */
+    public static boolean getTopicNameParameter(String topicName,
+                                                MetadataManager metadataManager,
+                                                StringBuilder strBuffer,
+                                                ProcessResult result) {
+        if (!getStringParameter(WebFieldDef.TOPICNAME,
+                topicName, strBuffer, result)) {
+            return result.success;
+        }
+        String tmpValue = (String) result.retData1;
+        if (metadataManager.getTopicMetadata(tmpValue) == null) {
+            result.setFailResult(TErrCodeConstants.FORBIDDEN,
+                    strBuffer.append(WebFieldDef.TOPICNAME.name)
+                            .append(" ").append(tmpValue)
+                            .append(" not existed, please check your configure").toString());
+            strBuffer.delete(0, strBuffer.length());
+        }
+        return result.success;
+    }
+
+    /**
+     * Check the existing topic name info
+     *
+     * @param topicName      the topic name to be checked.
+     * @param partitionId    the partition ID where the topic locates
+     * @param metadataManager the metadata manager which contains topic information
+     * @param strBuffer      the string buffer used to construct the check result
+     * @param result         the checked result
+     * @return the check result
+     */
+    public static boolean getTopicNamePartIdInfo(String topicName, int partitionId,
+                                                 MetadataManager metadataManager,
+                                                 StringBuilder strBuffer,
+                                                 ProcessResult result) {
+        if (!getStringParameter(WebFieldDef.TOPICNAME,
+                topicName, strBuffer, result)) {
+            return result.success;
+        }
+        String tmpValue = (String) result.retData1;
+        TopicMetadata topicMetadata = metadataManager.getTopicMetadata(tmpValue);
+        if (topicMetadata == null) {
+            result.setFailResult(TErrCodeConstants.FORBIDDEN,
+                    strBuffer.append(WebFieldDef.TOPICNAME.name)
+                            .append(" ").append(tmpValue)
+                            .append(" not existed, please check your configure").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return result.success;
+        }
+        if (metadataManager.isClosedTopic(tmpValue)) {
+            result.setFailResult(TErrCodeConstants.FORBIDDEN,
+                    strBuffer.append(WebFieldDef.TOPICNAME.name)
+                            .append(" ").append(tmpValue)
+                            .append(" has been closed").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return result.success;
+        }
+        int realPartition = partitionId < TBaseConstants.META_STORE_INS_BASE
+                ? partitionId : partitionId % TBaseConstants.META_STORE_INS_BASE;
+        if ((realPartition < 0) || (realPartition >= topicMetadata.getNumPartitions())) {
+            result.setFailResult(TErrCodeConstants.FORBIDDEN,
+                    strBuffer.append(WebFieldDef.PARTITIONID.name)
+                            .append(" ").append(tmpValue).append("-").append(partitionId)
+                            .append(" not existed, please check your configure").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return result.success;
+        }
+        result.setSuccResult(tmpValue);
+        return result.success;
+    }
 }