You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:01 UTC

[incubator-tubemq] 03/49: [TUBEMQ-430]Optimizing the implementation of HTTP API for broker (#338)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 9598a42a8675657cdf9190d6c75b1a21b1be70e0
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Dec 4 10:16:15 2020 +0800

    [TUBEMQ-430]Optimizing the implementation of HTTP API for broker (#338)
---
 .../tubemq/client/config/ConsumerConfig.java       |  12 +
 .../client/consumer/PullMessageConsumer.java       |   7 +
 .../org/apache/tubemq/corebase/utils/RegexDef.java |  60 ++++
 .../server/broker/web/AbstractWebHandler.java      |  83 +++++
 .../server/broker/web/BrokerAdminServlet.java      | 333 ++++++++++-----------
 .../tubemq/server/common/fielddef/CliArgDef.java   | 111 +++++++
 .../tubemq/server/common/fielddef/WebFieldDef.java | 159 ++++++++++
 .../tubemq/server/common/utils/ProcessResult.java  |  53 ++++
 .../server/common/utils/WebParameterUtils.java     | 320 +++++++++++++++++++-
 .../tubemq/server/common/webbase/WebFieldType.java |  60 ++++
 .../webbase/WebMethodMapper.java}                  |  12 +-
 .../server/master/web/action/screen/Webapi.java    |   6 +-
 .../master/web/handler/AbstractWebHandler.java     |   2 +-
 13 files changed, 1037 insertions(+), 181 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index d8b63fb..184da79 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -124,6 +124,18 @@ public class ConsumerConfig extends TubeClientConfig {
         return pullConsumeReadyWaitPeriodMs;
     }
 
+    // setPullConsumeReadyWaitPeriodMs() use note:
+    // The value range is [negative value, 0, positive value] and the value directly determines
+    // the behavior of the PullMessageConsumer.GetMessage() function:
+    // 1. if it is set to a negative value, it means that the GetMessage() calling thread will
+    //    be blocked forever and will not return until the consumption conditions are met;
+    // 2. if If it is set to 0, it means that the GetMessage() calling thread will only block
+    //    the ConsumerConfig.getPullConsumeReadyChkSliceMs() interval when the consumption
+    //    conditions are not met and then return;
+    // 3. if it is set to a positive number, it will not meet the current user usage (including
+    //    unused partitions or allocated partitions, but these partitions do not meet the usage
+    //    conditions), the GetMessage() calling thread will be blocked until the total time of
+    //    ConsumerConfig.getPullConsumeReadyWaitPeriodMs expires
     public void setPullConsumeReadyWaitPeriodMs(long pullConsumeReadyWaitPeriodMs) {
         this.pullConsumeReadyWaitPeriodMs = pullConsumeReadyWaitPeriodMs;
     }
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
index af5d50f..d9c3baf 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
@@ -28,6 +28,13 @@ public interface PullMessageConsumer extends MessageConsumer {
     PullMessageConsumer subscribe(String topic,
                                   TreeSet<String> filterConds) throws TubeClientException;
 
+    // getMessage() use note:
+    // This getMessage have a blocking situation: when the current
+    // consumer consumption situation is not satisfied (including
+    // without partitions to consumption, or allocated partitions but
+    // the partitions do not meet the consumption situation),
+    // the call will sleep at intervals of ConsumerConfig.getPullConsumeReadyChkSliceMs(),
+    // until the total time of ConsumerConfig.getPullConsumeReadyWaitPeriodMs
     ConsumerResult getMessage() throws TubeClientException;
 
     ConsumerResult confirmConsume(final String confirmContext,
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java
new file mode 100644
index 0000000..842c6a9
--- /dev/null
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.corebase.utils;
+
+
+
+public enum RegexDef {
+
+    TMP_FILTER(0, "^[_A-Za-z0-9]+$",
+            "must only contain characters,numbers,and underscores"),
+    TMP_STRING(1, "^[a-zA-Z]\\w+$",
+                       "must begin with a letter,can only contain characters,numbers,and underscores"),
+    TMP_NUMBER(2, "^-?[0-9]\\d*$", "must only contain numbers"),
+    TMP_GROUP(3, "^[a-zA-Z][\\w-]+$",
+                       "must begin with a letter,can only contain characters,numbers,hyphen,and underscores"),
+    TMP_CONSUMERID(4, "^[_A-Za-z0-9\\.\\-]+$",
+                      "must begin with a letter,can only contain characters,numbers,dot,scores,and underscores"),
+    TMP_IPV4ADDRESS(5,
+            "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))",
+            "must matches the IP V4 address regulation");
+
+
+    private final int id;
+    private final String pattern;
+    private final String errMsgTemp;
+
+
+    RegexDef(int id, String pattern, String errMsgTemp) {
+        this.id = id;
+        this.pattern = pattern;
+        this.errMsgTemp = errMsgTemp;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getPattern() {
+        return pattern;
+    }
+
+    public String getErrMsgTemp() {
+        return errMsgTemp;
+    }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
new file mode 100644
index 0000000..b44d88c
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.web;
+
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo;
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
+import java.io.IOException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.tubemq.server.broker.TubeBroker;
+import org.apache.tubemq.server.common.webbase.WebMethodMapper.WebApiRegInfo;
+
+
+
+public abstract class AbstractWebHandler extends HttpServlet {
+
+    protected final TubeBroker broker;
+
+    public AbstractWebHandler(TubeBroker broker) {
+        this.broker = broker;
+    }
+
+    @Override
+    protected void doGet(HttpServletRequest req,
+                         HttpServletResponse resp) throws IOException {
+        doPost(req, resp);
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req,
+                          HttpServletResponse resp) throws IOException {
+        StringBuilder strBuffer = new StringBuilder(1024);
+
+        try {
+            String method = req.getParameter("method");
+            if (method == null) {
+                strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+                        .append("Please take with method parameter! \"}");
+            } else {
+                WebApiRegInfo webApiRegInfo = getWebApiRegInfo(true, method);
+                if (webApiRegInfo == null) {
+                    strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+                            .append("Unsupported method ").append(method).append("}");
+                } else {
+                    strBuffer = (StringBuilder) webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req);
+                }
+            }
+        } catch (Throwable e) {
+            strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+                    .append("Bad request from server: ")
+                    .append(e.getMessage())
+                    .append("\"}");
+        }
+        resp.getWriter().write(strBuffer.toString());
+        resp.setCharacterEncoding(req.getCharacterEncoding());
+        resp.setStatus(HttpServletResponse.SC_OK);
+        resp.flushBuffer();
+    }
+
+    public abstract void registerWebApiMethod();
+
+    protected void innRegisterWebMethod(String webMethodName,
+                                        String clsMethodName) {
+        registerWebMethod(true, webMethodName, clsMethodName, this);
+    }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index e91ae79..ab43c2e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -17,18 +17,12 @@
 
 package org.apache.tubemq.server.broker.web;
 
-import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.TubeBroker;
@@ -36,83 +30,44 @@ import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.offset.OffsetService;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
 
 /***
  * Broker's web servlet. Used for admin operation, like query consumer's status etc.
  */
-public class BrokerAdminServlet extends HttpServlet {
-    private final TubeBroker broker;
+public class BrokerAdminServlet extends AbstractWebHandler {
 
-    public BrokerAdminServlet(TubeBroker broker) {
-        this.broker = broker;
-    }
 
-    @Override
-    protected void doGet(HttpServletRequest req,
-                         HttpServletResponse resp) throws ServletException, IOException {
-        doPost(req, resp);
+    public BrokerAdminServlet(TubeBroker broker) {
+        super(broker);
+        registerWebApiMethod();
     }
 
     @Override
-    protected void doPost(HttpServletRequest req,
-                          HttpServletResponse resp) throws ServletException, IOException {
-        StringBuilder sBuilder = new StringBuilder(1024);
-        try {
-            String method = req.getParameter("method");
-            if ("admin_manual_set_current_offset".equals(method)) {
-                // manual set offset
-                sBuilder = this.adminManualSetCurrentOffSet(req);
-            } else if ("admin_query_group_offset".equals(method)) {
-                // query consumer group's offset
-                sBuilder = this.adminQueryCurrentGroupOffSet(req);
-            } else if ("admin_snapshot_message".equals(method)) {
-                // query snapshot message
-                sBuilder = this.adminQuerySnapshotMessageSet(req);
-            } else if ("admin_query_broker_all_consumer_info".equals(method)) {
-                // query broker's all consumer info
-                sBuilder = this.adminQueryBrokerAllConsumerInfo(req);
-            } else if ("admin_query_broker_memstore_info".equals(method)) {
-                // get memory store status info
-                sBuilder = this.adminGetMemStoreStatisInfo(req);
-            } else if ("admin_query_broker_all_store_info".equals(method)) {
-                // query broker's all message store info
-                sBuilder = this.adminQueryBrokerAllMessageStoreInfo(req);
-            } else if ("admin_query_consumer_regmap".equals(method)) {
-                Map<String, ConsumerNodeInfo> map =
-                        broker.getBrokerServiceServer().getConsumerRegisterMap();
-                int totalCnt = 0;
-                sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
-                        .append(",\"dataSet\":[");
-                for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
-                    if (entry.getKey() == null || entry.getValue() == null) {
-                        continue;
-                    }
-                    if (totalCnt > 0) {
-                        sBuilder.append(",");
-                    }
-                    sBuilder.append("{\"Partition\":\"").append(entry.getKey())
-                            .append("\",\"Consumer\":\"")
-                            .append(entry.getValue().getConsumerId())
-                            .append("\",\"index\":").append(++totalCnt).append("}");
-                }
-                sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
-            } else {
-                sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
-                        .append("Invalid request: Unsupported method!")
-                        .append("\"}");
-            }
-
-        } catch (Exception e) {
-            sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
-                    .append("Bad request from server: ")
-                    .append(e.getMessage())
-                    .append("\"}");
-        }
-        resp.getWriter().write(sBuilder.toString());
-        resp.setCharacterEncoding(req.getCharacterEncoding());
-        resp.setStatus(HttpServletResponse.SC_OK);
-        resp.flushBuffer();
+    public void registerWebApiMethod() {
+        // query consumer group's offset
+        innRegisterWebMethod("admin_query_group_offset",
+                "adminQueryCurrentGroupOffSet");
+        // query snapshot message
+        innRegisterWebMethod("admin_snapshot_message",
+                "adminQuerySnapshotMessageSet");
+        // query broker's all consumer info
+        innRegisterWebMethod("admin_query_broker_all_consumer_info",
+                "adminQueryBrokerAllConsumerInfo");
+        // get memory store status info
+        innRegisterWebMethod("admin_query_broker_memstore_info",
+                "adminGetMemStoreStatisInfo");
+        // query broker's all message store info
+        innRegisterWebMethod("admin_query_broker_all_store_info",
+                "adminQueryBrokerAllMessageStoreInfo");
+        // query consumer register info
+        innRegisterWebMethod("admin_query_consumer_regmap",
+                "adminQueryConsumerRegisterInfo");
+        // manual set offset
+        innRegisterWebMethod("admin_manual_set_current_offset",
+                "adminManualSetCurrentOffSet");
     }
 
     /***
@@ -122,13 +77,16 @@ public class BrokerAdminServlet extends HttpServlet {
      * @return
      * @throws Exception
      */
-    private StringBuilder adminQueryBrokerAllConsumerInfo(HttpServletRequest req) throws Exception {
+    public StringBuilder adminQueryBrokerAllConsumerInfo(HttpServletRequest req) throws Exception {
         int index = 0;
         StringBuilder sBuilder = new StringBuilder(1024);
-        String groupNameInput =
-                WebParameterUtils.validGroupParameter("groupName",
-                        req.getParameter("groupName"),
-                        TBaseConstants.META_MAX_GROUPNAME_LENGTH, false, null);
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        Set<String> groupNameSet = (Set<String>) result.retData1;
+
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
         Map<String, ConsumerNodeInfo> map =
                 broker.getBrokerServiceServer().getConsumerRegisterMap();
@@ -139,7 +97,7 @@ public class BrokerAdminServlet extends HttpServlet {
             String[] partitionIdArr =
                     entry.getKey().split(TokenConstants.ATTR_SEP);
             String groupName = partitionIdArr[0];
-            if (!TStringUtils.isBlank(groupNameInput) && (!groupNameInput.equals(groupName))) {
+            if (!groupNameSet.isEmpty() && !groupNameSet.contains(groupName)) {
                 continue;
             }
             String topicName = partitionIdArr[1];
@@ -209,22 +167,23 @@ public class BrokerAdminServlet extends HttpServlet {
      * @return
      * @throws Exception
      */
-    private StringBuilder adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req)
+    public StringBuilder adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req)
             throws Exception {
         StringBuilder sBuilder = new StringBuilder(1024);
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
-        String topicNameInput =
-                WebParameterUtils.validStringParameter("topicName",
-                        req.getParameter("topicName"),
-                        TBaseConstants.META_MAX_TOPICNAME_LENGTH, false, null);
         Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores =
                 broker.getStoreManager().getMessageStores();
         int index = 0;
         int recordId = 0;
         for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : messageTopicStores.entrySet()) {
-            if (TStringUtils.isBlank(entry.getKey()) ||
-                    (TStringUtils.isNotBlank(topicNameInput)
-                            && !topicNameInput.equals(entry.getKey()))) {
+            if (TStringUtils.isBlank(entry.getKey())
+                    || (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey()))) {
                 continue;
             }
             if (recordId > 0) {
@@ -276,47 +235,27 @@ public class BrokerAdminServlet extends HttpServlet {
      * @return
      * @throws Exception
      */
-    private StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception {
+    public StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception {
         StringBuilder sBuilder = new StringBuilder(1024);
-        Set<String> batchTopicNames = new HashSet<>();
-        String inputTopicName = req.getParameter("topicName");
-        if (TStringUtils.isNotBlank(inputTopicName)) {
-            inputTopicName = inputTopicName.trim();
-            String[] strTopicNames =
-                    inputTopicName.split(TokenConstants.ARRAY_SEP);
-            for (int i = 0; i < strTopicNames.length; i++) {
-                if (TStringUtils.isBlank(strTopicNames[i])) {
-                    continue;
-                }
-                String topicName = strTopicNames[i].trim();
-                if (topicName.length() > TBaseConstants.META_MAX_TOPICNAME_LENGTH) {
-                    sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
-                            .append("Invalid parameter: the max length of ")
-                            .append(topicName).append(" in topicName parameter over ")
-                            .append(TBaseConstants.META_MAX_TOPICNAME_LENGTH)
-                            .append(" characters\"}");
-                    return sBuilder;
-                }
-                if (!topicName.matches(TBaseConstants.META_TMP_STRING_VALUE)) {
-                    sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
-                            .append("Invalid parameter: the value of ").append(topicName)
-                            .append(" in topicName parameter must begin with a letter,")
-                            .append(" can only contain characters,numbers,and underscores!\"}");
-                    return sBuilder;
-                }
-                batchTopicNames.add(topicName);
-            }
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        result = WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.NEEDREFRESH, false, false);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
         }
-        boolean requireRefresh =
-                WebParameterUtils.validBooleanDataParameter("needRefresh",
-                        req.getParameter("needRefresh"), false, false);
+        boolean requireRefresh = (boolean) result.retData1;
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"detail\":[");
         Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores =
                 broker.getStoreManager().getMessageStores();
         int recordId = 0, index = 0;
         for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : messageTopicStores.entrySet()) {
             if (TStringUtils.isBlank(entry.getKey())
-                    || (!batchTopicNames.isEmpty() && !batchTopicNames.contains(entry.getKey()))) {
+                    || (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey()))) {
                 continue;
             }
             String topicName = entry.getKey();
@@ -354,25 +293,38 @@ public class BrokerAdminServlet extends HttpServlet {
      * @return
      * @throws Exception
      */
-    private StringBuilder adminManualSetCurrentOffSet(HttpServletRequest req) throws Exception {
+    public StringBuilder adminManualSetCurrentOffSet(HttpServletRequest req) throws Exception {
         StringBuilder sBuilder = new StringBuilder(512);
-        final String topicName =
-                WebParameterUtils.validStringParameter("topicName",
-                        req.getParameter("topicName"),
-                        TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
-        final String groupName =
-                WebParameterUtils.validGroupParameter("groupName",
-                        req.getParameter("groupName"),
-                        TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, "");
-        final String modifyUser =
-                WebParameterUtils.validStringParameter("modifyUser",
-                        req.getParameter("modifyUser"), 64, true, "");
-        int partitionId =
-                WebParameterUtils.validIntDataParameter("partitionId",
-                        req.getParameter("partitionId"), true, -1, 0);
-        long manualOffset =
-                WebParameterUtils.validLongDataParameter("manualOffset",
-                        req.getParameter("manualOffset"), true, -1);
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        final String topicName = (String) result.retData1;
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        final String groupName = (String) result.retData1;
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        final String modifyUser = (String) result.retData1;
+        result = WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        int partitionId = (Integer) result.retData1;
+        result = WebParameterUtils.getLongParamValue(req,
+                WebFieldDef.MANUALOFFSET, true, -1);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        long manualOffset = (Long) result.retData1;
         List<String> topicList = broker.getMetadataManager().getTopics();
         if (!topicList.contains(topicName)) {
             sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
@@ -430,28 +382,39 @@ public class BrokerAdminServlet extends HttpServlet {
      * @return
      * @throws Exception
      */
-    private StringBuilder adminQuerySnapshotMessageSet(HttpServletRequest req) throws Exception {
+    public StringBuilder adminQuerySnapshotMessageSet(HttpServletRequest req) throws Exception {
         StringBuilder sBuilder = new StringBuilder(1024);
-        final String topicName =
-                WebParameterUtils.validStringParameter("topicName",
-                        req.getParameter("topicName"),
-                        TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
-        final int partitionId =
-                WebParameterUtils.validIntDataParameter("partitionId",
-                        req.getParameter("partitionId"), false, -1, 0);
-        int msgCount =
-                WebParameterUtils.validIntDataParameter("msgCount",
-                        req.getParameter("msgCount"), false, 3, 3);
-        msgCount = msgCount < 1 ? 1 : msgCount;
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        final String topicName = (String) result.retData1;
+        result = WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        int partitionId = (Integer) result.retData1;
+        result = WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.MSGCOUNT, false, 3, 3);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        int msgCount = (Integer) result.retData1;
+        msgCount = Math.max(msgCount, 1);
         if (msgCount > 50) {
             sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
                     .append("Over max allowed msgCount value, allowed count is 50!")
                     .append("\"}");
             return sBuilder;
         }
-        Set<String> filterCondStrSet =
-                WebParameterUtils.checkAndGetFilterCondSet(req.getParameter("filterConds"),
-                        false, true, sBuilder);
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.FILTERCONDS, false, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        Set<String> filterCondStrSet = (Set<String>) result.retData1;
         sBuilder = broker.getBrokerServiceServer()
                 .getMessageSnapshot(topicName, partitionId, msgCount, filterCondStrSet, sBuilder);
         return sBuilder;
@@ -464,20 +427,34 @@ public class BrokerAdminServlet extends HttpServlet {
      * @return
      * @throws Exception
      */
-    private StringBuilder adminQueryCurrentGroupOffSet(HttpServletRequest req)
+    public StringBuilder adminQueryCurrentGroupOffSet(HttpServletRequest req)
             throws Exception {
         StringBuilder sBuilder = new StringBuilder(1024);
-        String topicName =
-                WebParameterUtils.validStringParameter("topicName",
-                        req.getParameter("topicName"),
-                        TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, "");
-        String groupName =
-                WebParameterUtils.validGroupParameter("groupName",
-                        req.getParameter("groupName"),
-                        TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, "");
-        int partitionId =
-                WebParameterUtils.validIntDataParameter("partitionId",
-                        req.getParameter("partitionId"), true, -1, 0);
+        ProcessResult result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        final String topicName = (String) result.retData1;
+        result = WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        final String groupName = (String) result.retData1;
+        result = WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        int partitionId = (Integer) result.retData1;
+
+        result = WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.REQUIREREALOFFSET, false, false);
+        if (!result.success) {
+            return WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+        }
+        boolean requireRealOffset = (Boolean) result.retData1;
         List<String> topicList = broker.getMetadataManager().getTopics();
         if (!topicList.contains(topicName)) {
             sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
@@ -499,9 +476,6 @@ public class BrokerAdminServlet extends HttpServlet {
                     .append("\"}");
             return sBuilder;
         }
-        boolean requireRealOffset =
-                WebParameterUtils.validBooleanDataParameter("requireRealOffset",
-                        req.getParameter("requireRealOffset"), false, false);
         long tmpOffset = offsetService.getTmpOffset(groupName, topicName, partitionId);
         long minDataOffset = store.getDataMinOffset();
         long maxDataOffset = store.getDataMaxOffset();
@@ -538,5 +512,28 @@ public class BrokerAdminServlet extends HttpServlet {
         return sBuilder;
     }
 
+    public StringBuilder adminQueryConsumerRegisterInfo(HttpServletRequest req) {
+        StringBuilder sBuilder = new StringBuilder(1024);
+        Map<String, ConsumerNodeInfo> map =
+                broker.getBrokerServiceServer().getConsumerRegisterMap();
+        int totalCnt = 0;
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
+                .append(",\"dataSet\":[");
+        for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
+            if (entry.getKey() == null || entry.getValue() == null) {
+                continue;
+            }
+            if (totalCnt > 0) {
+                sBuilder.append(",");
+            }
+            sBuilder.append("{\"Partition\":\"").append(entry.getKey())
+                    .append("\",\"Consumer\":\"")
+                    .append(entry.getValue().getConsumerId())
+                    .append("\",\"index\":").append(++totalCnt).append("}");
+        }
+        sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}");
+        return sBuilder;
+    }
+
 
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
new file mode 100644
index 0000000..b2d9327
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.fielddef;
+
+
+
+public enum CliArgDef {
+
+    // Note: Due to compatibility considerations,
+    //      the defined fields in the scheme are forbidden to be modified,
+    //      only new fields can be added
+
+    HELP("h", "help", "Print usage information."),
+    VERSION("v", "version", "Display TubeMQ version."),
+    MASTERSERVER("master-servers", "master-servers",
+            "String: format is master1_ip:port[,master2_ip:port]",
+            "The master address(es) to connect to."),
+    MASTERURL("master-url", "master-url",
+            "String: format is http://master_ip:master_webport/",
+            "Master Service URL to which to connect.(default: http://localhost:8080/)"),
+    BROKERURL("broker-url", "broker-url",
+            "String: format is http://broker_ip:broker_webport/",
+            "Broker Service URL to which to connect.(default: http://localhost:8081/)"),
+    MESSAGES("messages", "messages",
+            "Long: count",
+            "The number of messages to send or consume, If not set, production or consumption is continual."),
+    MSGDATASIZE("msg-data-size", "message-data-size",
+            "Int: message size",
+            "message's data size in bytes. Note that you must provide exactly"
+                    + " one of --msg-data-size or --payload-file."),
+    PAYLOADFILE("payload-file", "payload-file",
+            "String: payload file path",
+            "file to read the message payloads from. This works only for"
+                    + " UTF-8 encoded text files. Payloads will be read from this"
+                    + " file and a payload will be randomly selected when sending"
+                    + " messages. Note that you must provide exactly one"
+                    + " of --msg-data-size or --payload-file."),
+    PAYLOADDELIM("payload-delimiter", "payload-delimiter",
+            "String: payload data's delimiter",
+            "provides delimiter to be used when --payload-file is provided."
+                    + " Defaults to new line. Note that this parameter will be"
+                    + " ignored if --payload-file is not provided. (default: \\n)"),
+    PRDTOPIC("topic", "topicName",
+            "String: topic, format is topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]",
+            "The topic(s) to produce messages to."),
+    CNSTOPIC("topic", "topicName",
+            "String: topic, format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]",
+            "The topic(s) to consume on."),
+    RPCTIMEOUT("timeout", "timeout",
+            "Long: milliseconds",
+            "The maximum duration between request and response in milliseconds. (default: 10000)"),
+    GROUP("group", "groupName",
+            "String: consumer group",
+            "The consumer group name of the consumer."),
+    CLIENTCOUNT("client-num", "client-num",
+            "Int: client count",
+            "Number of consumers to started."),
+    PULLMODEL("pull-model", "pull-model",
+            "Pull consumption model."),
+    PUSHMODEL("push-model", "push-model",
+            "Push consumption model."),
+    FETCHTHREADS("num-fetch-threads", "num-fetch-threads",
+            "Integer: count",
+            "Number of fetch threads, default: num of cpu count."),
+    FROMLATEST("from-latest", "from-latest",
+            "Start to consume from the latest message present in the log."),
+    FROMBEGINNING("from-beginning", "from-beginning",
+            "If the consumer does not already have an established offset to consume from,"
+                    + " start with the earliest message present in the log rather than the latest message."),
+    OUTPUTINTERVAL("output-interval", "output-interval",
+            "Integer: interval_ms",
+            "Interval in milliseconds at which to print progress info. (default: 5000)");
+
+
+    CliArgDef(String opt, String longOpt, String optDesc) {
+        this(opt, longOpt, false, "", optDesc);
+    }
+
+    CliArgDef(String opt, String longOpt, String argDesc, String optDesc) {
+        this(opt, longOpt, true, argDesc, optDesc);
+    }
+
+    CliArgDef(String opt, String longOpt, boolean hasArg, String argDesc, String optDesc) {
+        this.opt = opt;
+        this.longOpt = longOpt;
+        this.hasArg = hasArg;
+        this.argDesc = argDesc;
+        this.optDesc = optDesc;
+    }
+
+    public final String opt;
+    public final String longOpt;
+    public final boolean hasArg;
+    public final String argDesc;
+    public final String optDesc;
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
new file mode 100644
index 0000000..1025ba0
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.fielddef;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.RegexDef;
+import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.webbase.WebFieldType;
+
+
+public enum WebFieldDef {
+
+    // Note: Due to compatibility considerations,
+    //      the defined fields in the scheme are forbidden to be modified,
+    //      only new fields can be added
+
+    TOPICNAME(0, "topicName", "topic", WebFieldType.STRING,
+            "Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH,
+            RegexDef.TMP_STRING),
+    GROUPNAME(1, "groupName", "group", WebFieldType.STRING,
+            "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+            RegexDef.TMP_GROUP),
+    PARTITIONID(2, "partitionId", "pid", WebFieldType.INT,
+            "Partition id", RegexDef.TMP_NUMBER),
+    CREATEUSER(3, "createUser", "cur", WebFieldType.STRING,
+            "Record creator", TBaseConstants.META_MAX_USERNAME_LENGTH,
+            RegexDef.TMP_STRING),
+    MODIFYUSER(4, "modifyUser", "mur", WebFieldType.STRING,
+            "Record modifier", TBaseConstants.META_MAX_USERNAME_LENGTH,
+            RegexDef.TMP_STRING),
+    MANUALOFFSET(5, "manualOffset", "offset", WebFieldType.LONG,
+            "Reset offset value", RegexDef.TMP_NUMBER),
+    MSGCOUNT(6, "msgCount", "cnt", WebFieldType.INT,
+            "Number of returned messages", RegexDef.TMP_NUMBER),
+    FILTERCONDS(7, "filterConds", "flts", WebFieldType.COMPSTRING,
+            "Filter condition items", TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_LENGTH,
+            TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_COUNT, RegexDef.TMP_FILTER),
+    REQUIREREALOFFSET(8, "requireRealOffset", "dko", WebFieldType.BOOLEAN,
+            "Require return disk offset details"),
+    NEEDREFRESH(9, "needRefresh", "nrf", WebFieldType.BOOLEAN,
+            "Require refresh data"),
+    COMPSGROUPNAME(10, "groupName", "group", WebFieldType.COMPSTRING,
+            "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
+                   RegexDef.TMP_GROUP),
+    COMPSTOPICNAME(11, "topicName", "topic", WebFieldType.COMPSTRING,
+            "Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH,
+            RegexDef.TMP_STRING),
+    COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT,
+            "Partition id", RegexDef.TMP_NUMBER);
+
+
+
+
+    public final int id;
+    public final String name;
+    public final String shortName;
+    public final WebFieldType type;
+    public final String desc;
+    public final boolean compVal;
+    public final String splitToken;
+    public final int itemMaxCnt;
+    public final int valMaxLen;
+    public final boolean regexCheck;
+    public final RegexDef regexDef;
+
+
+    WebFieldDef(int id, String name, String shortName, WebFieldType type, String desc) {
+        this(id, name, shortName, type, desc, TBaseConstants.META_VALUE_UNDEFINED,
+                TBaseConstants.META_VALUE_UNDEFINED, false, null);
+    }
+
+    WebFieldDef(int id, String name, String shortName, WebFieldType type,
+                String desc, RegexDef regexDef) {
+        this(id, name, shortName, type, desc,
+                TBaseConstants.META_VALUE_UNDEFINED, regexDef);
+    }
+
+    WebFieldDef(int id, String name, String shortName, WebFieldType type,
+                String desc, int valMaxLen, RegexDef regexDef) {
+        this(id, name, shortName, type, desc, valMaxLen,
+                TServerConstants.CFG_BATCH_RECORD_OPERATE_MAX_COUNT,
+                true, regexDef);
+    }
+
+    WebFieldDef(int id, String name, String shortName, WebFieldType type,
+                String desc, int valMaxLen, int itemMaxCnt, RegexDef regexDef) {
+        this(id, name, shortName, type, desc, valMaxLen,
+                itemMaxCnt, true, regexDef);
+    }
+
+    WebFieldDef(int id, String name, String shortName, WebFieldType type,
+                String desc, int valMaxLen, int itemMaxCnt,
+                boolean regexChk, RegexDef regexDef) {
+        this.id = id;
+        this.name = name;
+        this.shortName = shortName;
+        this.type = type;
+        this.desc = desc;
+        if (isCompFieldType()) {
+            this.compVal = true;
+            this.splitToken = TokenConstants.ARRAY_SEP;
+            this.itemMaxCnt = itemMaxCnt;
+        } else {
+            this.compVal = false;
+            this.splitToken = "";
+            this.itemMaxCnt = TBaseConstants.META_VALUE_UNDEFINED;
+        }
+        this.valMaxLen = valMaxLen;
+        this.regexCheck = regexChk;
+        this.regexDef = regexDef;
+    }
+
+    public boolean isCompFieldType() {
+        return (this.type == WebFieldType.COMPINT
+                || this.type == WebFieldType.COMPSTRING);
+    }
+
+    private static final WebFieldDef[] WEB_FIELD_DEFS;
+    private static final int MIN_FIELD_ID = 0;
+    public static final int MAX_FIELD_ID;
+
+    static {
+        int maxId = -1;
+        for (WebFieldDef fieldDef : WebFieldDef.values()) {
+            maxId = Math.max(maxId, fieldDef.id);
+        }
+        WebFieldDef[] idToType = new WebFieldDef[maxId + 1];
+        for (WebFieldDef fieldDef : WebFieldDef.values()) {
+            idToType[fieldDef.id] = fieldDef;
+        }
+        WEB_FIELD_DEFS = idToType;
+        MAX_FIELD_ID = maxId;
+    }
+
+    public static WebFieldDef valueOf(int fieldId) {
+        if (fieldId >= MIN_FIELD_ID && fieldId <= MAX_FIELD_ID) {
+            return WEB_FIELD_DEFS[fieldId];
+        }
+        throw new IllegalArgumentException(
+                String.format("Unexpected WebFieldDef id `%s`, it should be between `%s` " +
+                "and `%s` (inclusive)", fieldId, MIN_FIELD_ID, MAX_FIELD_ID));
+    }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
new file mode 100644
index 0000000..3688b1c
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.utils;
+
+import org.apache.tubemq.corebase.TErrCodeConstants;
+
+public class ProcessResult {
+    public boolean success = true;
+    public int errCode = TErrCodeConstants.SUCCESS;
+    public String errInfo = "";
+    public Object retData1 = null;
+
+    public ProcessResult() {
+
+    }
+
+    public ProcessResult(Object retData) {
+        this.success = true;
+        this.retData1 = retData;
+    }
+
+    public ProcessResult(int errCode, String errInfo) {
+        this.success = false;
+        this.errCode = errCode;
+        this.errInfo = errInfo;
+    }
+
+    public void setFailResult(int errCode, final String errMsg) {
+        this.success = false;
+        this.errCode = errCode;
+        this.errInfo = errMsg;
+    }
+
+    public void setSuccResult(Object retData) {
+        this.success = true;
+        this.retData1 = retData;
+    }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index f4ce5bf..b7d5d41 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -30,18 +30,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.TStatusConstants;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
 
 
+
 public class WebParameterUtils {
 
     private static final List<String> allowedDelUnits = Arrays.asList("s", "m", "h");
@@ -153,6 +156,24 @@ public class WebParameterUtils {
     /**
      * Parse the parameter value from an object value to string value
      *
+     * @param req          http servlet request
+     * @param paramName    the parameter name
+     * @param paramMaxLen  the max length of string to return
+     * @param required     a boolean value represent whether the parameter is must required
+     * @param defaultValue a default value returned if failed to parse value from the given object
+     * @return a string value of parameter
+     * @throws Exception if failed to parse the object
+     */
+    public static String validStringParameter(HttpServletRequest req, String paramName,
+                                              int paramMaxLen, boolean required,
+                                              String defaultValue) throws Exception {
+        return validStringParameter(paramName,
+                req.getParameter(paramName), paramMaxLen, required, defaultValue);
+    }
+
+    /**
+     * Parse the parameter value from an object value to string value
+     *
      * @param paramName    the parameter name
      * @param paramValue   the parameter value which is an object for parsing
      * @param paramMaxLen  the max length of string to return
@@ -161,9 +182,11 @@ public class WebParameterUtils {
      * @return a string value of parameter
      * @throws Exception if failed to parse the object
      */
-    public static String validStringParameter(String paramName, String paramValue, int paramMaxLen,
-                                              boolean required, String defaultValue) throws Exception {
-        String tmpParamValue = checkParamCommonRequires(paramName, paramValue, required);
+    public static String validStringParameter(String paramName, String paramValue,
+                                              int paramMaxLen, boolean required,
+                                              String defaultValue) throws Exception {
+        String tmpParamValue =
+                checkParamCommonRequires(paramName, paramValue, required);
         if (TStringUtils.isBlank(tmpParamValue)) {
             return defaultValue;
         }
@@ -214,6 +237,297 @@ public class WebParameterUtils {
         return tmpParamValue;
     }
 
+    public static StringBuilder buildFailResult(StringBuilder strBuffer, String errMsg) {
+        return strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+                .append(errMsg).append("\"}");
+    }
+
+    /**
+     * Parse the parameter value from an object value to a long value
+     *
+     * @param req        Http Servlet Request
+     * @param fieldDef   the parameter field definition
+     * @param required   a boolean value represent whether the parameter is must required
+     * @param defValue   a default value returned if failed to parse value from the given object
+     * @return valid result for the parameter value
+     */
+    public static ProcessResult getLongParamValue(HttpServletRequest req,
+                                                  WebFieldDef fieldDef,
+                                                  boolean required,
+                                                  long defValue) {
+        ProcessResult procResult =
+                getStringParamValue(req, fieldDef, required, null);
+        if (!procResult.success) {
+            return procResult;
+        }
+        String paramValue = (String) procResult.retData1;
+        if (paramValue == null) {
+            procResult.setSuccResult(defValue);
+            return procResult;
+        }
+        try {
+            long paramIntVal = Long.parseLong(paramValue);
+            procResult.setSuccResult(paramIntVal);
+        } catch (Throwable e) {
+            procResult.setFailResult(400,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(fieldDef.name).append(" parse error: ")
+                            .append(e.getMessage()).toString());
+        }
+        return procResult;
+    }
+
+    /**
+     * Parse the parameter value from an object value to a integer value
+     *
+     * @param req        Http Servlet Request
+     * @param fieldDef   the parameter field definition
+     * @param required   a boolean value represent whether the parameter is must required
+     * @param defValue   a default value returned if failed to parse value from the given object
+     * @param minValue   min value required
+     * @return valid result for the parameter value
+     */
+    public static ProcessResult getIntParamValue(HttpServletRequest req,
+                                                 WebFieldDef fieldDef,
+                                                 boolean required,
+                                                 int defValue,
+                                                 int minValue) {
+        ProcessResult procResult =
+                getStringParamValue(req, fieldDef, required, null);
+        if (!procResult.success) {
+            return procResult;
+        }
+        if (fieldDef.isCompFieldType()) {
+            Set<Integer> tgtValueSet = new HashSet<Integer>();
+            Set<String> valItemSet = (Set<String>) procResult.retData1;
+            if (valItemSet.isEmpty()) {
+                tgtValueSet.add(defValue);
+                procResult.setSuccResult(tgtValueSet);
+                return procResult;
+            }
+            ProcessResult procRet = new ProcessResult();
+            for (String itemVal : valItemSet) {
+                if (!checkIntValueNorms(procRet, fieldDef, itemVal, minValue)) {
+                    return procRet;
+                }
+                tgtValueSet.add((Integer) procRet.retData1);
+            }
+            procResult.setSuccResult(tgtValueSet);
+        } else {
+            String paramValue = (String) procResult.retData1;
+            if (paramValue == null) {
+                procResult.setSuccResult(defValue);
+                return procResult;
+            }
+            checkIntValueNorms(procResult, fieldDef, paramValue, minValue);
+        }
+        return procResult;
+    }
+
+    /**
+     * Parse the parameter value from an object value to a boolean value
+     *
+     * @param req         Http Servlet Request
+     * @param fieldDef    the parameter field definition
+     * @param required    a boolean value represent whether the parameter is must required
+     * @param defValue    a default value returned if failed to parse value from the given object
+     * @return valid result for the parameter value
+     */
+    public static ProcessResult getBooleanParamValue(HttpServletRequest req,
+                                                     WebFieldDef fieldDef,
+                                                     boolean required,
+                                                     boolean defValue) {
+        ProcessResult procResult =
+                getStringParamValue(req, fieldDef, required, null);
+        if (!procResult.success) {
+            return procResult;
+        }
+        String paramValue = (String) procResult.retData1;
+        if (paramValue == null) {
+            procResult.setSuccResult(defValue);
+            return procResult;
+        }
+        procResult.setSuccResult(Boolean.parseBoolean(paramValue));
+        return procResult;
+    }
+
+    /**
+     * Parse the parameter value from an object value
+     *
+     * @param req         Http Servlet Request
+     * @param fieldDef    the parameter field definition
+     * @param required     a boolean value represent whether the parameter is must required
+     * @param defValue     a default value returned if failed to parse value from the given object
+     * @return valid result for the parameter value
+     */
+    public static ProcessResult getStringParamValue(HttpServletRequest req,
+                                                    WebFieldDef fieldDef,
+                                                    boolean required,
+                                                    String defValue) {
+        ProcessResult procResult = new ProcessResult();
+        // get parameter value
+        String paramValue = req.getParameter(fieldDef.name);
+        if (paramValue == null) {
+            paramValue = req.getParameter(fieldDef.shortName);
+        }
+        if (TStringUtils.isNotBlank(paramValue)) {
+            // Cleanup value extra characters
+            paramValue = escDoubleQuotes(paramValue.trim());
+        }
+        // Check if the parameter exists
+        if (TStringUtils.isBlank(paramValue)) {
+            if (required) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name)
+                                .append(" is missing or value is null or blank!").toString());
+            } else {
+                procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+            }
+            return procResult;
+        }
+        // check if value is norm;
+        if (fieldDef.isCompFieldType()) {
+            // split original value to items
+            Set<String> valItemSet = new HashSet<>();
+            String[] strParamValueItems = paramValue.split(fieldDef.splitToken);
+            for (String strParamValueItem : strParamValueItems) {
+                if (TStringUtils.isBlank(strParamValueItem)) {
+                    continue;
+                }
+                if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) {
+                    return procResult;
+                }
+                valItemSet.add((String) procResult.retData1);
+            }
+            // check if is empty result
+            if (valItemSet.isEmpty()) {
+                if (required) {
+                    procResult.setFailResult(fieldDef.id,
+                            new StringBuilder(512).append("Parameter ")
+                                    .append(fieldDef.name)
+                                    .append(" is missing or value is null or blank!").toString());
+                } else {
+                    procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+                }
+                return procResult;
+            }
+            // check max item count
+            if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) {
+                if (valItemSet.size() > fieldDef.itemMaxCnt) {
+                    procResult.setFailResult(fieldDef.id,
+                            new StringBuilder(512).append("Parameter ")
+                                    .append(fieldDef.name)
+                                    .append("'s item count over max allowed count (")
+                                    .append(fieldDef.itemMaxCnt).append(")!").toString());
+                }
+            }
+            procResult.setSuccResult(valItemSet);
+        } else {
+            if (!checkStrValueNorms(procResult, fieldDef, paramValue)) {
+                return procResult;
+            }
+            procResult.setSuccResult(paramValue);
+        }
+        return procResult;
+    }
+
+    /**
+     * process string default value
+     *
+     * @param procResult process result
+     * @param isCompFieldType   the parameter if compound field type
+     * @param defValue   the parameter default value
+     * @return process result for default value of parameter
+     */
+    private static ProcessResult procStringDefValue(ProcessResult procResult,
+                                                    boolean isCompFieldType,
+                                                    String defValue) {
+        if (isCompFieldType) {
+            Set<String> valItemSet = new HashSet<>();
+            if (TStringUtils.isNotBlank(defValue)) {
+                valItemSet.add(defValue);
+            }
+            procResult.setSuccResult(valItemSet);
+        } else {
+            procResult.setSuccResult(defValue);
+        }
+        return procResult;
+    }
+
+    /**
+     * Parse the parameter string value by regex define
+     *
+     * @param procResult   process result
+     * @param fieldDef     the parameter field definition
+     * @param paramVal     the parameter value
+     * @return check result for string value of parameter
+     */
+    private static boolean checkStrValueNorms(ProcessResult procResult,
+                                              WebFieldDef fieldDef,
+                                              String paramVal) {
+        paramVal = paramVal.trim();
+        if (TStringUtils.isBlank(paramVal)) {
+            procResult.setSuccResult(null);
+            return true;
+        }
+        // check value's max length
+        if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
+            if (paramVal.length() > fieldDef.valMaxLen) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("over max length for ")
+                                .append(fieldDef.name).append(", only allow ")
+                                .append(fieldDef.valMaxLen).append(" length").toString());
+                return false;
+            }
+        }
+        // check value's pattern
+        if (fieldDef.regexCheck) {
+            if (!paramVal.matches(fieldDef.regexDef.getPattern())) {
+                procResult.setFailResult(fieldDef.id,
+                        new StringBuilder(512).append("illegal value for ")
+                                .append(fieldDef.name).append(", value ")
+                                .append(fieldDef.regexDef.getErrMsgTemp()).toString());
+                return false;
+            }
+        }
+        procResult.setSuccResult(paramVal);
+        return true;
+    }
+
+    /**
+     * Parse the parameter string value by regex define
+     *
+     * @param procResult   process result
+     * @param fieldDef     the parameter field definition
+     * @param paramValue   the parameter value
+     * param minValue      the parameter min value
+     * @return check result for string value of parameter
+     */
+    private static boolean checkIntValueNorms(ProcessResult procResult,
+                                              WebFieldDef fieldDef,
+                                              String paramValue,
+                                              int minValue) {
+        try {
+            int paramIntVal = Integer.parseInt(paramValue);
+            if (paramIntVal < minValue) {
+                procResult.setFailResult(400,
+                        new StringBuilder(512).append("Parameter ")
+                                .append(fieldDef.name).append(" value must >= ")
+                                .append(minValue).toString());
+                return false;
+            }
+            procResult.setSuccResult(paramIntVal);
+        } catch (Throwable e) {
+            procResult.setFailResult(400,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(fieldDef.name).append(" parse error: ")
+                            .append(e.getMessage()).toString());
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Parse the parameter value from an object value to ip address of string value
      *
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
new file mode 100644
index 0000000..b83a966
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.webbase;
+
+
+
+public enum WebFieldType {
+
+    UNKNOWN(-1, "Unknown field type"),
+    STRING(1, "String"),
+    INT(2, "int"),
+    LONG(3, "long"),
+    BOOLEAN(4, "Boolean"),
+    DATE(5, "Date"),
+    COMPSTRING(6, "Compound string"),
+    COMPINT(7, "Compound integer");
+
+
+    public int value;
+    public String desc;
+
+    WebFieldType(int value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+
+    public static WebFieldType valueOf(int value) {
+        for (WebFieldType fieldType : WebFieldType.values()) {
+            if (fieldType.getValue() == value) {
+                return fieldType;
+            }
+        }
+
+        return UNKNOWN;
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public String getDesc(){
+        return desc;
+    }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java
similarity index 87%
rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java
rename to tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java
index a71ba03..a856014 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.server.master.web.handler;
+package org.apache.tubemq.server.common.webbase;
 
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -24,10 +24,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class WebApiMapper {
+public class WebMethodMapper {
     // log printer
     private static final Logger logger =
-            LoggerFactory.getLogger(WebApiMapper.class);
+            LoggerFactory.getLogger(WebMethodMapper.class);
     // The query methods map
     public static final Map<String, WebApiRegInfo> WEB_QRY_METHOD_MAP =
             new HashMap<>();
@@ -47,7 +47,7 @@ public class WebApiMapper {
     public static void registerWebMethod(boolean isQryApi,
                                          String webMethodName,
                                          String clsMethodName,
-                                         AbstractWebHandler webHandler) {
+                                         Object webHandler) {
         Method[] methods = webHandler.getClass().getMethods();
         for (Method item : methods) {
             if (item.getName().equals(clsMethodName)) {
@@ -71,10 +71,10 @@ public class WebApiMapper {
 
     public static class WebApiRegInfo {
         public Method method;
-        public AbstractWebHandler webHandler;
+        public Object webHandler;
 
         public WebApiRegInfo(Method method,
-                             AbstractWebHandler webHandler) {
+                             Object webHandler) {
             this.method = method;
             this.webHandler = webHandler;
         }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index 326f690..5d4de04 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -17,7 +17,7 @@
 
 package org.apache.tubemq.server.master.web.action.screen;
 
-import static org.apache.tubemq.server.master.web.handler.WebApiMapper.getWebApiRegInfo;
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo;
 
 import java.util.Arrays;
 import java.util.List;
@@ -25,13 +25,13 @@ import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corerpc.exception.StandbyException;
+import org.apache.tubemq.server.common.webbase.WebMethodMapper;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
 import org.apache.tubemq.server.master.web.handler.AbstractWebHandler;
 import org.apache.tubemq.server.master.web.handler.WebAdminFlowRuleHandler;
 import org.apache.tubemq.server.master.web.handler.WebAdminGroupCtrlHandler;
 import org.apache.tubemq.server.master.web.handler.WebAdminTopicAuthHandler;
-import org.apache.tubemq.server.master.web.handler.WebApiMapper;
 import org.apache.tubemq.server.master.web.handler.WebBrokerDefConfHandler;
 import org.apache.tubemq.server.master.web.handler.WebBrokerTopicConfHandler;
 import org.apache.tubemq.server.master.web.handler.WebMasterInfoHandler;
@@ -102,7 +102,7 @@ public class Webapi implements Action {
                             "DesignatedPrimary happened...please check if the other member is down");
                 }
             }
-            WebApiMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method);
+            WebMethodMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method);
             if (webApiRegInfo == null) {
                 strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Unsupported method: ")
                         .append(method).append("}");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
index 09b11eb..1b1bfdc 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
@@ -17,7 +17,7 @@
 
 package org.apache.tubemq.server.master.web.handler;
 
-import static org.apache.tubemq.server.master.web.handler.WebApiMapper.registerWebMethod;
+import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;