You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/06 08:37:53 UTC

[incubator-tubemq] branch master updated (c80af84 -> df37f5f)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git.


    from c80af84  [TUBEMQ-485]Add the batch setting API of consume group offset
     new 537ed34  [TUBEMQ-486]Add the delete API of consumer group offset
     new df37f5f  [TUBEMQ-486]Add the delete API of consumer group offset

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../server/broker/offset/DefaultOffsetManager.java | 104 ++++-
 .../tubemq/server/broker/offset/OffsetService.java |   4 +
 .../server/broker/web/BrokerAdminServlet.java      | 430 +++++++++++++--------
 .../tubemq/server/common/fielddef/WebFieldDef.java |   6 +-
 .../tubemq/server/common/fileconfig/ZKConfig.java  |   1 -
 .../server/common/offsetstorage/OffsetStorage.java |   7 +-
 .../common/offsetstorage/ZkOffsetStorage.java      | 175 ++++++---
 .../common/offsetstorage/zookeeper/ZKUtil.java     |  14 +
 .../tubemq/server/common/utils/ProcessResult.java  |   3 +
 .../server/common/utils/WebParameterUtils.java     | 278 ++++++-------
 .../web/handler/WebBrokerTopicConfHandler.java     |  21 +-
 11 files changed, 661 insertions(+), 382 deletions(-)


[incubator-tubemq] 02/02: [TUBEMQ-486]Add the delete API of consumer group offset

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit df37f5f443873add0a563d9637f84b0b533d90e3
Author: gosonzhang <go...@tencent.com>
AuthorDate: Wed Jan 6 11:33:28 2021 +0800

    [TUBEMQ-486]Add the delete API of consumer group offset
---
 .../java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 1f2fb5d..30d7247 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -1104,7 +1104,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                     for (Integer partitionId : entry.getValue()) {
                         GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
                         offsetInfo.setPartPubStoreInfo(
-                                storeInfoMap == null ? null :storeInfoMap.get(partitionId));
+                                storeInfoMap == null ? null : storeInfoMap.get(partitionId));
                         offsetInfo.setConsumeOffsetInfo(
                                 partBookedMap == null ? null : partBookedMap.get(partitionId));
                         String queryKey = buildQueryID(group, topic, partitionId);


[incubator-tubemq] 01/02: [TUBEMQ-486]Add the delete API of consumer group offset

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 537ed34b39dfb007fea0d65773e2f1a509a1bd46
Author: gosonzhang <go...@tencent.com>
AuthorDate: Wed Jan 6 11:27:15 2021 +0800

    [TUBEMQ-486]Add the delete API of consumer group offset
---
 .../server/broker/offset/DefaultOffsetManager.java | 104 ++++-
 .../tubemq/server/broker/offset/OffsetService.java |   4 +
 .../server/broker/web/BrokerAdminServlet.java      | 430 +++++++++++++--------
 .../tubemq/server/common/fielddef/WebFieldDef.java |   6 +-
 .../tubemq/server/common/fileconfig/ZKConfig.java  |   1 -
 .../server/common/offsetstorage/OffsetStorage.java |   7 +-
 .../common/offsetstorage/ZkOffsetStorage.java      | 175 ++++++---
 .../common/offsetstorage/zookeeper/ZKUtil.java     |  14 +
 .../tubemq/server/common/utils/ProcessResult.java  |   3 +
 .../server/common/utils/WebParameterUtils.java     | 278 ++++++-------
 .../web/handler/WebBrokerTopicConfHandler.java     |  21 +-
 11 files changed, 661 insertions(+), 382 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 84dabb2..f052375 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.tubemq.server.broker.offset;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -339,7 +340,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         Set<String> groupSet = new HashSet<>();
         groupSet.addAll(cfmOffsetMap.keySet());
         Map<String, Set<String>> localGroups =
-                zkOffsetStorage.getZkLocalGroupTopicInfos();
+                zkOffsetStorage.queryZkAllGroupTopicInfos();
         groupSet.addAll(localGroups.keySet());
         return groupSet;
     }
@@ -364,7 +365,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     public Set<String> getUnusedGroupInfo() {
         Set<String> unUsedGroups = new HashSet<>();
         Map<String, Set<String>> localGroups =
-                zkOffsetStorage.getZkLocalGroupTopicInfos();
+                zkOffsetStorage.queryZkAllGroupTopicInfos();
         for (String groupName : localGroups.keySet()) {
             if (!cfmOffsetMap.containsKey(groupName)) {
                 unUsedGroups.add(groupName);
@@ -383,9 +384,11 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         Set<String> result = new HashSet<>();
         Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
         if (topicPartOffsetMap == null) {
-            Map<String, Set<String>> localGroups =
-                    zkOffsetStorage.getZkLocalGroupTopicInfos();
-            result = localGroups.get(group);
+            List<String> groupLst = new ArrayList<>(1);
+            groupLst.add(group);
+            Map<String, Set<String>> groupTopicInfo =
+                    zkOffsetStorage.queryZKGroupTopicInfo(groupLst);
+            result = groupTopicInfo.get(group);
         } else {
             for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) {
                 result.add(storageInfo.getTopic());
@@ -496,6 +499,53 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     /***
+     * Delete offset.
+     *
+     * @param onlyMemory
+     * @param groupTopicPartMap
+     * @param modifier
+     */
+    @Override
+    public void deleteGroupOffset(boolean onlyMemory,
+                                  Map<String, Map<String, Set<Integer>>> groupTopicPartMap,
+                                  String modifier) {
+        String printBase;
+        StringBuilder strBuidler = new StringBuilder(512);
+        for (Map.Entry<String, Map<String, Set<Integer>>> entry
+                : groupTopicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            rmvOffset(entry.getKey(), entry.getValue());
+        }
+        if (onlyMemory) {
+            printBase = strBuidler
+                    .append("[Offset Manager] delete offset from memory by modifier=")
+                    .append(modifier).toString();
+        } else {
+            zkOffsetStorage.deleteGroupOffsetInfo(groupTopicPartMap);
+            printBase = strBuidler
+                    .append("[Offset Manager] delete offset from memory and zk by modifier=")
+                    .append(modifier).toString();
+        }
+        strBuidler.delete(0, strBuidler.length());
+        // print log
+        for (Map.Entry<String, Map<String, Set<Integer>>> entry
+                : groupTopicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            logger.info(strBuidler.append(printBase).append(",group=").append(entry.getKey())
+                    .append(",topic-partId-map=").append(entry.getValue()).toString());
+            strBuidler.delete(0, strBuidler.length());
+        }
+    }
+
+    /***
      * Set temp offset.
      *
      * @param group
@@ -611,6 +661,50 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         return regInfo;
     }
 
+    private void rmvOffset(String group, Map<String, Set<Integer>> topicPartMap) {
+        if (group == null
+                || topicPartMap == null
+                || topicPartMap.isEmpty()) {
+            return;
+        }
+        // remove confirm offset
+        ConcurrentHashMap<String, OffsetStorageInfo> regInfoMap = cfmOffsetMap.get(group);
+        if (regInfoMap != null) {
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                if (entry.getKey() == null
+                        || entry.getValue() == null
+                        || entry.getValue().isEmpty()) {
+                    continue;
+                }
+                for (Integer partitionId : entry.getValue()) {
+                    String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId);
+                    regInfoMap.remove(offsetCacheKey);
+                }
+            }
+            if (regInfoMap.isEmpty()) {
+                cfmOffsetMap.remove(group);
+            }
+        }
+        // remove tmp offset
+        ConcurrentHashMap<String, Long> tmpRegInfoMap = tmpOffsetMap.get(group);
+        if (tmpRegInfoMap != null) {
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                if (entry.getKey() == null
+                        || entry.getValue() == null
+                        || entry.getValue().isEmpty()) {
+                    continue;
+                }
+                for (Integer partitionId : entry.getValue()) {
+                    String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId);
+                    tmpRegInfoMap.remove(offsetCacheKey);
+                }
+            }
+            if (tmpRegInfoMap.isEmpty()) {
+                tmpOffsetMap.remove(group);
+            }
+        }
+    }
+
     private String getOffsetCacheKey(String topic, int partitionId) {
         return new StringBuilder(256).append(topic)
                 .append("-").append(partitionId).toString();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 9dcd29a..4a19798 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -72,4 +72,8 @@ public interface OffsetService {
     boolean modifyGroupOffset(Set<String> groups,
                               List<Tuple3<String, Integer, Long>> topicPartOffsets,
                               String modifier);
+
+    void deleteGroupOffset(boolean onlyMemory,
+                           Map<String, Map<String, Set<Integer>>> groupTopicPartMap,
+                           String modifier);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index c76a6b7..1f2fb5d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -95,10 +95,13 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // set or update group's offset info
         innRegisterWebMethod("admin_set_offset",
                 "adminSetGroupOffSet");
+        // remove group's offset info
+        innRegisterWebMethod("admin_rmv_offset",
+                "adminRemoveGroupOffSet");
     }
 
     public void adminQueryAllMethods(HttpServletRequest req,
-                                     StringBuilder sBuilder) throws Exception {
+                                     StringBuilder sBuilder) {
         int index = 0;
         List<String> methods = getSupportedMethod();
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
@@ -120,11 +123,11 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminQueryBrokerAllConsumerInfo(HttpServletRequest req,
-                                                StringBuilder sBuilder) throws Exception {
+                                                StringBuilder sBuilder) {
         int index = 0;
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, false, null);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -209,10 +212,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req,
-                                                    StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+                                                    StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -276,17 +279,16 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminGetMemStoreStatisInfo(HttpServletRequest req,
-                                           StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+                                           StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         Set<String> topicNameSet = (Set<String>) result.retData1;
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.NEEDREFRESH, false, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.NEEDREFRESH, false, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -335,38 +337,34 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminManualSetCurrentOffSet(HttpServletRequest req,
-                                            StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TOPICNAME, true, null);
-        if (!result.success) {
+                                            StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String topicName = (String) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.GROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String groupName = (String) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.MODIFYUSER, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String modifyUser = (String) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.PARTITIONID, true, -1, 0);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         int partitionId = (Integer) result.retData1;
-        result = WebParameterUtils.getLongParamValue(req,
-                WebFieldDef.MANUALOFFSET, true, -1);
-        if (!result.success) {
+        if (!WebParameterUtils.getLongParamValue(req,
+                WebFieldDef.MANUALOFFSET, true, -1, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -429,23 +427,21 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQuerySnapshotMessageSet(HttpServletRequest req,
                                              StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TOPICNAME, true, null);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String topicName = (String) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.PARTITIONID, true, -1, 0);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         int partitionId = (Integer) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.MSGCOUNT, false, 3, 3);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.MSGCOUNT, false, 3, 3, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -457,9 +453,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                     .append("\"}");
             return;
         }
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.FILTERCONDS, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.FILTERCONDS, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -477,31 +472,27 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryCurrentGroupOffSet(HttpServletRequest req,
                                              StringBuilder sBuilder) {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TOPICNAME, true, null);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String topicName = (String) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.GROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String groupName = (String) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.PARTITIONID, true, -1, 0);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         int partitionId = (Integer) result.retData1;
-
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.REQUIREREALOFFSET, false, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.REQUIREREALOFFSET, false, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -567,8 +558,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         Map<String, ConsumerNodeInfo> map =
                 broker.getBrokerServiceServer().getConsumerRegisterMap();
         int totalCnt = 0;
-        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
-                .append(",\"dataSet\":[");
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
         for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -592,10 +582,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryPubInfo(HttpServletRequest req,
                                   StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get the topic set to be queried
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -637,9 +627,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     public void adminQueryBookedGroup(HttpServletRequest req,
                                       StringBuilder sBuilder) {
         // get divide info
-        ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.WITHDIVIDE, false, false);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHDIVIDE, false, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -697,16 +687,24 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryGroupOffSet(HttpServletRequest req,
                                       StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get group list
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
+        Set<String> inGroupNameSet = (Set<String>) result.retData1;
+        // get the topic set to be queried
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> topicSet = (Set<String>) result.retData1;
         // filter invalid groups
         Set<String> qryGroupNameSet = new HashSet<>();
-        Set<String> inGroupNameSet = (Set<String>) result.retData1;
         Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups();
         if (inGroupNameSet.isEmpty()) {
             qryGroupNameSet = bookedGroupSet;
@@ -717,19 +715,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                 }
             }
         }
-        // get the topic set to be queried
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
-            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
-            return;
-        }
-        // get target consume group name
-        Set<String> topicSet = (Set<String>) result.retData1;
         // verify the acquired Topic set and
         //   query the corresponding offset information
         Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps =
-                getGroupOffsetInfo(qryGroupNameSet, topicSet);
+                getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, qryGroupNameSet, topicSet);
         // builder result
         int totalCnt = 0;
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
@@ -772,26 +761,24 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminSetGroupOffSet(HttpServletRequest req,
                                     StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get group list
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         Set<String> groupNameSet = (Set<String>) result.retData1;
         // get set mode
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.MANUALSET, true, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.MANUALSET, true, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         boolean manualSet = (Boolean) result.retData1;
         // get modify user
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.MODIFYUSER, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -799,17 +786,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         final String modifier = (String) result.retData1;
         if (manualSet) {
             // get offset json info
-            result = WebParameterUtils.getJsonDictParamValue(req,
-                    WebFieldDef.OFFSETJSON, true, null);
-            if (!result.success) {
+            if (!WebParameterUtils.getJsonDictParamValue(req,
+                    WebFieldDef.OFFSETJSON, true, null, result)) {
                 WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
                 return;
             }
             Map<String, Long> manOffsets =
                     (Map<String, Long>) result.retData1;
             // valid and transfer offset format
-            result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets);
-            if (!result.success) {
+            if (!validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets, result)) {
                 WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
                 return;
             }
@@ -817,9 +802,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                     (List<Tuple3<String, Integer, Long>>) result.retData1;
         } else {
             // get the topic set to be set
-            result = WebParameterUtils.getStringParamValue(req,
-                    WebFieldDef.COMPSTOPICNAME, true, null);
-            if (!result.success) {
+            if (!WebParameterUtils.getStringParamValue(req,
+                    WebFieldDef.COMPSTOPICNAME, true, null, result)) {
                 WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
                 return;
             }
@@ -840,38 +824,43 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminCloneGroupOffSet(HttpServletRequest req,
                                       StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get source consume group name
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.SRCGROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.SRCGROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String srcGroupName = (String) result.retData1;
-        // get modify user
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.MODIFYUSER, true, null);
-        if (!result.success) {
+        // get source consume group's topic set cloned to target group
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
-        final String modifier = (String) result.retData1;
-        // get source consume group's topic set cloned to target group
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+        Set<String> srcTopicNameSet = (Set<String>) result.retData1;
+        // valid topic and get topic's partitionIds
+        if (!validAndGetTopicPartInfo(srcGroupName,
+                WebFieldDef.SRCGROUPNAME, srcTopicNameSet, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
+        Map<String, Set<Integer>> topicPartMap =
+                (Map<String, Set<Integer>>) result.retData1;
         // get target consume group name
-        Set<String> srcTopicNameSet = (Set<String>) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TGTCOMPSGROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TGTCOMPSGROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         Set<String> tgtGroupNameSet = (Set<String>) result.retData1;
+        // get modify user
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        final String modifier = (String) result.retData1;
         // check sourceGroup if existed
         Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
         if (!bookedGroups.contains(srcGroupName)) {
@@ -882,16 +871,6 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                             .append(" has not been registered on this Broker!").toString());
             return;
         }
-        // valid topic and get topic's partitionIds
-        Map<String, Set<Integer>> topicPartMap =
-                validAndGetPartitions(srcGroupName, srcTopicNameSet);
-        if (topicPartMap.isEmpty()) {
-            WebParameterUtils.buildFailResult(sBuilder,
-                    new StringBuilder(512).append("Parameter ")
-                            .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ")
-                            .append(srcGroupName).append(" subscribed topic set!").toString());
-            return;
-        }
         // query offset from source group
         Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
                 broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
@@ -904,6 +883,56 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
     }
 
+    /***
+     * Remove consume group offset.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminRemoveGroupOffSet(HttpServletRequest req,
+                                      StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        // get consume group name
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        Set<String> groupNameSet = (Set<String>) result.retData1;
+        // get modify user
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        final String modifier = (String) result.retData1;
+        // get need removed offset's topic
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // get set mode
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.ONLYMEM, false, false, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        boolean onlyMemory = (Boolean) result.retData1;
+        if (!validAndGetGroupTopicInfo(groupNameSet, topicNameSet, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        Map<String, Map<String, Set<Integer>>> groupTopicPartMap =
+                (Map<String, Map<String, Set<Integer>>>) result.retData1;
+        broker.getOffsetManager().deleteGroupOffset(
+                onlyMemory, groupTopicPartMap, modifier);
+        // builder return result
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+    }
+
     // build reset offset info
     private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(
             Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) {
@@ -950,8 +979,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
         MessageStoreManager storeManager = broker.getStoreManager();
         // get topic's partition set
-        Map<String, Set<Integer>> topicPartMap =
-                validAndGetPartitions(null, topicSet);
+        Map<String, Set<Integer>> topicPartMap = getTopicPartitions(topicSet);
         // fill current topic's max offset value
         for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
             if (entry.getKey() == null
@@ -979,15 +1007,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     }
 
     // build reset offset info
-    private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef,
-                                                  Map<String, Long> manOffsetInfoMap) {
+    private boolean validManOffsetResetInfo(WebFieldDef fieldDef,
+                                            Map<String, Long> manOffsetInfoMap,
+                                            ProcessResult result) {
         String brokerId;
         String topicName;
         String strPartId;
         int partitionId;
         long adjOffset;
         MessageStore store = null;
-        ProcessResult procResult = new ProcessResult();
         MessageStoreManager storeManager = broker.getStoreManager();
         List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>();
         String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId());
@@ -1001,12 +1029,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             // parse and check partitionKey value
             String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
             if (keyItems.length != 3) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append("'s key invalid:")
                                 .append(entry.getKey())
                                 .append(" must be brokerId:topicName:partitionId !").toString());
-                return procResult;
+                return result.success;
             }
             brokerId = keyItems[0].trim();
             topicName = keyItems[1].trim();
@@ -1018,12 +1046,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             try {
                 partitionId = Integer.parseInt(strPartId);
             } catch (NumberFormatException e) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append("'s key invalid:")
                                 .append(entry.getKey())
                                 .append("'s partitionId value not number!").toString());
-                return procResult;
+                return result.success;
             }
             // check and adjust offset value
             try {
@@ -1041,65 +1069,129 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset));
         }
         if (offsetVals.isEmpty()) {
-            procResult.setFailResult(fieldDef.id,
+            result.setFailResult(fieldDef.id,
                     new StringBuilder(512).append("Parameter ")
-                            .append(fieldDef.name)
-                            .append("'s value is invalid!").toString());
+                            .append(fieldDef.name).append("'s value is invalid!").toString());
         } else {
-            procResult.setSuccResult(offsetVals);
+            result.setSuccResult(offsetVals);
         }
-        return procResult;
+        return result.success;
     }
 
     // builder group's offset info
     private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
-            Set<String> groupSet, Set<String> topicSet) {
-        long curReadDataOffset = -2;
-        long curDataLag = -2;
+            WebFieldDef groupFldDef, Set<String> groupSet, Set<String> topicSet) {
+        ProcessResult result = new ProcessResult();
         Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>();
         for (String group : groupSet) {
             Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>();
             // valid and get topic's partitionIds
-            Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet);
-            // get topic's publish info
-            Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
-                    broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
-            // get group's booked offset info
-            Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
-                    broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
-            // get offset info array
-            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
-                String topic = entry.getKey();
-                Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
-                Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
-                Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
-                for (Integer partitionId : entry.getValue()) {
-                    GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
-                    offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId));
-                    offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId));
-                    String queryKey = buildQueryID(group, topic, partitionId);
-                    ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
-                    if (nodeInfo != null) {
-                        offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+            if (validAndGetTopicPartInfo(group, groupFldDef, topicSet, result)) {
+                Map<String, Set<Integer>> topicPartMap =
+                        (Map<String, Set<Integer>>) result.retData1;
+                // get topic's publish info
+                Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+                        broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
+                // get group's booked offset info
+                Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
+                        broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+                // get offset info array
+                for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                    String topic = entry.getKey();
+                    Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
+                    Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
+                    Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
+                    for (Integer partitionId : entry.getValue()) {
+                        GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
+                        offsetInfo.setPartPubStoreInfo(
+                                storeInfoMap == null ? null :storeInfoMap.get(partitionId));
+                        offsetInfo.setConsumeOffsetInfo(
+                                partBookedMap == null ? null : partBookedMap.get(partitionId));
+                        String queryKey = buildQueryID(group, topic, partitionId);
+                        ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
+                        if (nodeInfo != null) {
+                            offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+                        }
+                        offsetInfo.calculateLag();
+                        partOffsetRet.put(partitionId, offsetInfo);
                     }
-                    offsetInfo.calculateLag();
-                    partOffsetRet.put(partitionId, offsetInfo);
+                    topicOffsetRet.put(topic, partOffsetRet);
                 }
-                topicOffsetRet.put(topic, partOffsetRet);
             }
             groupOffsetMaps.put(group, topicOffsetRet);
         }
         return groupOffsetMaps;
     }
 
+    // valid and get need removed group-topic info
+    private boolean validAndGetGroupTopicInfo(Set<String> groupSet,
+                                              Set<String> topicSet,
+                                              ProcessResult result) {
+        Map<String, Map<String, Set<Integer>>> groupTopicPartMap = new HashMap<>();
+        // filter group
+        Set<String> targetGroupSet = new HashSet<>();
+        Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
+        for (String orgGroup : groupSet) {
+            if (bookedGroups.contains(orgGroup)) {
+                targetGroupSet.add(orgGroup);
+            }
+        }
+        // valid specified topic set
+        for (String group : targetGroupSet) {
+            if (validAndGetTopicPartInfo(group, WebFieldDef.GROUPNAME, topicSet, result)) {
+                Map<String, Set<Integer>> topicPartMap =
+                        (Map<String, Set<Integer>>) result.retData1;
+                groupTopicPartMap.put(group, topicPartMap);
+            }
+        }
+        result.setSuccResult(groupTopicPartMap);
+        return true;
+    }
 
-    private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
-        Map<String, Set<Integer>> topicPartMap = new HashMap<>();
-        // query stored topic set stored in memory or zk
-        if (topicSet.isEmpty() && group != null) {
-            topicSet = broker.getOffsetManager().getGroupSubInfo(group);
+    private boolean validAndGetTopicPartInfo(String groupName,
+                                             WebFieldDef groupFldDef,
+                                             Set<String> topicSet,
+                                             ProcessResult result) {
+        Set<String> subTopicSet =
+                broker.getOffsetManager().getGroupSubInfo(groupName);
+        if (subTopicSet == null || subTopicSet.isEmpty()) {
+            result.setFailResult(400, new StringBuilder(512)
+                    .append("Parameter ").append(groupFldDef.name)
+                    .append(": subscribed topic set of ").append(groupName)
+                    .append(" query result is null!").toString());
+            return result.success;
         }
-        // get topic's partitionIds
+        // filter valid topic set
+        Set<String> tgtTopicSet = new HashSet<>();
+        if (topicSet.isEmpty()) {
+            tgtTopicSet = subTopicSet;
+        } else {
+            for (String topic : topicSet) {
+                if (subTopicSet.contains(topic)) {
+                    tgtTopicSet.add(topic);
+                }
+            }
+            if (tgtTopicSet.isEmpty()) {
+                result.setFailResult(400, new StringBuilder(512)
+                        .append("Parameter ").append(groupFldDef.name)
+                        .append(": ").append(groupName)
+                        .append(" unsubscribed to the specified topic set!").toString());
+                return result.success;
+            }
+        }
+        Map<String, Set<Integer>> topicPartMap = getTopicPartitions(tgtTopicSet);
+        if (topicPartMap.isEmpty()) {
+            result.setFailResult(400, new StringBuilder(512)
+                    .append("Parameter ").append(groupFldDef.name)
+                    .append(": all topics subscribed by the group have been deleted!").toString());
+            return result.success;
+        }
+        result.setSuccResult(topicPartMap);
+        return result.success;
+    }
+
+    private Map<String, Set<Integer>> getTopicPartitions(Set<String> topicSet) {
+        Map<String, Set<Integer>> topicPartMap = new HashMap<>();
         if (topicSet != null) {
             Map<String, TopicMetadata> topicConfigMap =
                     broker.getMetadataManager().getTopicConfigMap();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 45b862d..a65a223 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -81,8 +81,10 @@ public enum WebFieldDef {
             TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
     MANUALSET(20, "manualSet", "manSet",
             WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
-    OFFSETJSON(21, "offsetJsonSet", "offsetSet",
-            WebFieldType.JSONTYPE, "The offset set that needs to be added or modified");
+    OFFSETJSON(21, "offsetJsonInfo", "offsetInfo",
+            WebFieldType.JSONTYPE, "The offset info that needs to be added or modified"),
+    ONLYMEM(22, "onlyMemory", "onlyMem", WebFieldType.BOOLEAN,
+            "Only clear the offset data in the memory cache, default is false");
 
 
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
index 5ac3683..8e677a5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
@@ -28,7 +28,6 @@ public class ZKConfig {
     private int zkSyncTimeMs = 1000;
     private long zkCommitPeriodMs = 5000L;
     private int zkCommitFailRetries = TServerConstants.CFG_ZK_COMMIT_DEFAULT_RETRIES;
-
     public ZKConfig() {
 
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
index dca7ca8..f5eee40 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
@@ -18,6 +18,7 @@
 package org.apache.tubemq.server.common.offsetstorage;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,10 +34,12 @@ public interface OffsetStorage {
                       final Collection<OffsetStorageInfo> offsetInfoList,
                       boolean isFailRetry);
 
-    Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos();
+    Map<String, Set<String>> queryZkAllGroupTopicInfos();
 
-    Map<String, Set<String>> getZkLocalGroupTopicInfos();
+    Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet);
 
     Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
                                             Set<Integer> partitionIds);
+
+    void deleteGroupOffsetInfo(Map<String, Map<String, Set<Integer>>> groupTopicPartMap);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 8094151..3700753 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -35,6 +35,8 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /**
  * A offset storage implementation with zookeeper
  */
@@ -64,18 +66,16 @@ public class ZkOffsetStorage implements OffsetStorage {
     private final String consumerZkDir;
     private final boolean isBroker;
     private final int brokerId;
+    private final String strBrokerId;
     private ZKConfig zkConfig;
     private ZooKeeperWatcher zkw;
-    // group-topic-brokerid
-    private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>();
-    // group-topic
-    private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>();
 
 
     public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
         this.zkConfig = zkConfig;
-        this.brokerId = brokerId;
         this.isBroker = isBroker;
+        this.brokerId = brokerId;
+        this.strBrokerId = String.valueOf(brokerId);
         this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot());
         this.consumerZkDir = this.tubeZkRoot + "/consumers-v3";
         try {
@@ -86,8 +86,6 @@ public class ZkOffsetStorage implements OffsetStorage {
                     .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
             System.exit(1);
         }
-        logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper");
-        queryAllZKGroupTopicInfo();
         logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!");
     }
 
@@ -102,16 +100,6 @@ public class ZkOffsetStorage implements OffsetStorage {
     }
 
     @Override
-    public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() {
-        return zkGroupTopicBrokerInfos;
-    }
-
-    @Override
-    public Map<String, Set<String>> getZkLocalGroupTopicInfos() {
-        return zkLocalGroupTopicInfos;
-    }
-
-    @Override
     public void commitOffset(final String group,
                              final Collection<OffsetStorageInfo> offsetInfoList,
                              boolean isFailRetry) {
@@ -242,63 +230,130 @@ public class ZkOffsetStorage implements OffsetStorage {
     }
 
     /**
-     * Get group-topic-brokerid map info stored in zookeeper.
-     * <p/>
-     * The broker only cares about the content of its own node,
-     * so this part only queries when the node starts, and
-     * caches relevant data in the memory for finding
-     *
+     * Query booked topic info of groups stored in zookeeper.
+     * @param groupSet query groups
+     * @return group--topic map info
      */
-    private void queryAllZKGroupTopicInfo() {
+    @Override
+    public Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet) {
+        String qryBrokerId;
+        Map<String, Set<String>> groupTopicMap = new HashMap<>();
         StringBuilder sBuider = new StringBuilder(512);
-        // get all booked groups name
+        if (groupSet == null || groupSet.isEmpty()) {
+            return groupTopicMap;
+        }
+        // build path base
         String groupNode = sBuider.append(this.consumerZkDir).toString();
-        List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
         sBuider.delete(0, sBuider.length());
-        if (bookedGroups != null) {
-            // get topic info by group
-            for (String group : bookedGroups) {
-                String topicNode = sBuider.append(groupNode)
-                        .append("/").append(group).append("/offsets").toString();
-                List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
-                sBuider.delete(0, sBuider.length());
-                Set<String> topicSet = new HashSet<>();
-                Map<String, Set<String>> topicBrokerSet = new HashMap<>();
-                if (consumeTopics != null) {
-                    // get broker info by topic
-                    for (String topic : consumeTopics) {
-                        String brokerNode = sBuider.append(topicNode)
-                                .append("/").append(topic).toString();
-                        List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
-                        sBuider.delete(0, sBuider.length());
-                        Set<String> brokerIdSet = new HashSet<>();
-                        if (brokerIds != null) {
-                            for (String idStr : brokerIds) {
-                                if (idStr != null) {
-                                    String[] brokerPartIdStrs =
-                                            idStr.split(TokenConstants.HYPHEN);
-                                    brokerIdSet.add(brokerPartIdStrs[0]);
+        // get the group managed by this broker
+        for (String group : groupSet) {
+            String topicNode = sBuider.append(groupNode)
+                    .append("/").append(group).append("/offsets").toString();
+            List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
+            sBuider.delete(0, sBuider.length());
+            Set<String> topicSet = new HashSet<>();
+            if (consumeTopics != null) {
+                for (String topic : consumeTopics) {
+                    if (topic == null) {
+                        continue;
+                    }
+                    String brokerNode = sBuider.append(topicNode)
+                            .append("/").append(topic).toString();
+                    List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
+                    sBuider.delete(0, sBuider.length());
+                    if (brokerIds != null) {
+                        for (String idStr : brokerIds) {
+                            if (idStr != null) {
+                                String[] brokerPartIdStrs =
+                                        idStr.split(TokenConstants.HYPHEN);
+                                qryBrokerId = brokerPartIdStrs[0];
+                                if (qryBrokerId != null
+                                        && strBrokerId.equals(qryBrokerId.trim())) {
+                                    topicSet.add(topic);
+                                    break;
                                 }
                             }
-                            if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) {
-                                topicSet.add(topic);
-                            }
                         }
-                        topicBrokerSet.put(topic, brokerIdSet);
                     }
                 }
-                if (!topicSet.isEmpty()) {
-                    zkLocalGroupTopicInfos.put(group, topicSet);
+            }
+            if (!topicSet.isEmpty()) {
+                groupTopicMap.put(group, topicSet);
+            }
+        }
+        return groupTopicMap;
+    }
+
+    /**
+     * Get group-topic map info stored in zookeeper.
+     * <p/>
+     * The broker only cares about the content of its own node
+     *
+     */
+    @Override
+    public Map<String, Set<String>> queryZkAllGroupTopicInfos() {
+        StringBuilder sBuider = new StringBuilder(512);
+        // get all booked groups name
+        String groupNode = sBuider.append(this.consumerZkDir).toString();
+        List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
+        return queryZKGroupTopicInfo(bookedGroups);
+    }
+
+    /**
+     * Get offset stored in zookeeper, if not found or error, set null
+     * <p/>
+     *
+     * @return partitionId--offset map info
+     */
+    @Override
+    public void deleteGroupOffsetInfo(
+            Map<String, Map<String, Set<Integer>>> groupTopicPartMap) {
+        StringBuilder sBuider = new StringBuilder(512);
+        for (Map.Entry<String, Map<String, Set<Integer>>> entry
+                : groupTopicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            String basePath = sBuider.append(this.consumerZkDir).append("/")
+                    .append(entry.getKey()).append("/offsets").toString();
+            sBuider.delete(0, sBuider.length());
+            Map<String, Set<Integer>> topicPartMap = entry.getValue();
+            for (Map.Entry<String, Set<Integer>> topicEntry : topicPartMap.entrySet()) {
+                if (topicEntry.getKey() == null
+                        || topicEntry.getValue() == null
+                        || topicEntry.getValue().isEmpty()) {
+                    continue;
+                }
+                Set<Integer> partIdSet = topicEntry.getValue();
+                for (Integer partitionId : partIdSet) {
+                    String offsetNode = sBuider.append(basePath).append("/")
+                            .append(topicEntry.getKey()).append("/")
+                            .append(brokerId).append(TokenConstants.HYPHEN)
+                            .append(partitionId).toString();
+                    sBuider.delete(0, sBuider.length());
+                    ZKUtil.delZNode(this.zkw, offsetNode);
                 }
-                zkGroupTopicBrokerInfos.put(group, topicBrokerSet);
+                String parentNode = sBuider.append(basePath).append("/")
+                        .append(topicEntry.getKey()).toString();
+                sBuider.delete(0, sBuider.length());
+                chkAndRmvBlankParentNode(parentNode);
             }
+            chkAndRmvBlankParentNode(basePath);
+            String parentNode = sBuider.append(this.consumerZkDir)
+                    .append("/").append(entry.getKey()).toString();
+            sBuider.delete(0, sBuider.length());
+            chkAndRmvBlankParentNode(parentNode);
         }
-        logger.info(new StringBuilder(256)
-                .append("[ZkOffsetStorage] query from zookeeper, total group size = ")
-                .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ")
-                .append(zkLocalGroupTopicInfos.size()).toString());
     }
 
+    private void chkAndRmvBlankParentNode(String parentNode) {
+        List<String> nodeSet = ZKUtil.getChildren(zkw, parentNode);
+        if (nodeSet != null && nodeSet.isEmpty()) {
+            ZKUtil.delZNode(this.zkw, parentNode);
+        }
+    }
 
     private String normalize(final String root) {
         if (root.startsWith("/")) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
index da86191..5eb6cea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
@@ -167,6 +167,20 @@ public class ZKUtil {
         }
     }
 
+    /**
+     * delete the specified znode.
+     *
+     * @param zkw   zk reference
+     * @param znode path of node
+     */
+    public static void delZNode(ZooKeeperWatcher zkw, String znode) {
+        try {
+            zkw.getRecoverableZooKeeper().delete(znode, -1);
+        } catch (Throwable e) {
+            //
+        }
+    }
+
     private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
                                           boolean watcherSet) throws KeeperException {
         try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
index 3688b1c..8cabf67 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
@@ -44,10 +44,13 @@ public class ProcessResult {
         this.success = false;
         this.errCode = errCode;
         this.errInfo = errMsg;
+        this.retData1 = null;
     }
 
     public void setSuccResult(Object retData) {
         this.success = true;
+        this.errInfo = "";
+        this.errCode = TErrCodeConstants.SUCCESS;
         this.retData1 = retData;
     }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index fddd5de..2318bdb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -249,32 +249,32 @@ public class WebParameterUtils {
      * @param fieldDef   the parameter field definition
      * @param required   a boolean value represent whether the parameter is must required
      * @param defValue   a default value returned if failed to parse value from the given object
-     * @return valid result for the parameter value
+     * @param result     process result of parameter value
+     * @return process result
      */
-    public static ProcessResult getLongParamValue(HttpServletRequest req,
-                                                  WebFieldDef fieldDef,
-                                                  boolean required,
-                                                  long defValue) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
-        }
-        String paramValue = (String) procResult.retData1;
+    public static boolean getLongParamValue(HttpServletRequest req,
+                                            WebFieldDef fieldDef,
+                                            boolean required,
+                                            long defValue,
+                                            ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef, required, null, result)) {
+            return result.success;
+        }
+        String paramValue = (String) result.retData1;
         if (paramValue == null) {
-            procResult.setSuccResult(defValue);
-            return procResult;
+            result.setSuccResult(defValue);
+            return result.success;
         }
         try {
             long paramIntVal = Long.parseLong(paramValue);
-            procResult.setSuccResult(paramIntVal);
+            result.setSuccResult(paramIntVal);
         } catch (Throwable e) {
-            procResult.setFailResult(400,
+            result.setFailResult(400,
                     new StringBuilder(512).append("Parameter ")
                             .append(fieldDef.name).append(" parse error: ")
                             .append(e.getMessage()).toString());
         }
-        return procResult;
+        return result.success;
     }
 
     /**
@@ -283,43 +283,44 @@ public class WebParameterUtils {
      * @param req        Http Servlet Request
      * @param fieldDef   the parameter field definition
      * @param required   a boolean value represent whether the parameter is must required
-     * @return valid result for the parameter value
+     * @param result     process result of parameter value
+     * @return process result
      */
-    public static ProcessResult getIntParamValue(HttpServletRequest req,
-                                                 WebFieldDef fieldDef,
-                                                 boolean required) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
+    public static boolean getIntParamValue(HttpServletRequest req,
+                                           WebFieldDef fieldDef,
+                                           boolean required,
+                                           ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef,
+                required, null, result)) {
+            return result.success;
         }
-        ProcessResult procRet = new ProcessResult();
         Set<Integer> tgtValueSet = new HashSet<Integer>();
         if (fieldDef.isCompFieldType()) {
-            Set<String> valItemSet = (Set<String>) procResult.retData1;
+            Set<String> valItemSet = (Set<String>) result.retData1;
             if (valItemSet.isEmpty()) {
-                procResult.setSuccResult(tgtValueSet);
-                return procResult;
+                result.setSuccResult(tgtValueSet);
+                return result.success;
             }
             for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) {
-                    return procRet;
+                if (!checkIntValueNorms(fieldDef,
+                        itemVal, false, -1, result)) {
+                    return result.success;
                 }
-                tgtValueSet.add((Integer) procRet.retData1);
+                tgtValueSet.add((Integer) result.retData1);
             }
         } else {
-            String paramValue = (String) procResult.retData1;
+            String paramValue = (String) result.retData1;
             if (paramValue == null) {
-                procResult.setSuccResult(tgtValueSet);
-                return procResult;
+                result.setSuccResult(tgtValueSet);
+                return result.success;
             }
-            if (!checkIntValueNorms(procRet,
-                    fieldDef, paramValue, false, -1)) {
-                tgtValueSet.add((Integer) procRet.retData1);
+            if (!checkIntValueNorms(fieldDef,
+                    paramValue, false, -1, result)) {
+                tgtValueSet.add((Integer) result.retData1);
             }
         }
-        procResult.setSuccResult(tgtValueSet);
-        return procResult;
+        result.setSuccResult(tgtValueSet);
+        return result.success;
     }
 
     /**
@@ -330,43 +331,44 @@ public class WebParameterUtils {
      * @param required   a boolean value represent whether the parameter is must required
      * @param defValue   a default value returned if failed to parse value from the given object
      * @param minValue   min value required
-     * @return valid result for the parameter value
+     * @param result     process result of parameter value
+     * @return process result
      */
-    public static ProcessResult getIntParamValue(HttpServletRequest req,
+    public static boolean getIntParamValue(HttpServletRequest req,
                                                  WebFieldDef fieldDef,
                                                  boolean required,
                                                  int defValue,
-                                                 int minValue) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
+                                                 int minValue,
+                                                 ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef, required, null, result)) {
+            return result.success;
         }
         if (fieldDef.isCompFieldType()) {
             Set<Integer> tgtValueSet = new HashSet<Integer>();
-            Set<String> valItemSet = (Set<String>) procResult.retData1;
+            Set<String> valItemSet = (Set<String>) result.retData1;
             if (valItemSet.isEmpty()) {
                 tgtValueSet.add(defValue);
-                procResult.setSuccResult(tgtValueSet);
-                return procResult;
+                result.setSuccResult(tgtValueSet);
+                return result.success;
             }
-            ProcessResult procRet = new ProcessResult();
             for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) {
-                    return procRet;
+                if (!checkIntValueNorms(fieldDef,
+                        itemVal, true, minValue, result)) {
+                    return result.success;
                 }
-                tgtValueSet.add((Integer) procRet.retData1);
+                tgtValueSet.add((Integer) result.retData1);
             }
-            procResult.setSuccResult(tgtValueSet);
+            result.setSuccResult(tgtValueSet);
         } else {
-            String paramValue = (String) procResult.retData1;
+            String paramValue = (String) result.retData1;
             if (paramValue == null) {
-                procResult.setSuccResult(defValue);
-                return procResult;
+                result.setSuccResult(defValue);
+                return result.success;
             }
-            checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue);
+            checkIntValueNorms(fieldDef,
+                    paramValue, true, minValue, result);
         }
-        return procResult;
+        return result.success;
     }
 
     /**
@@ -376,24 +378,24 @@ public class WebParameterUtils {
      * @param fieldDef    the parameter field definition
      * @param required    a boolean value represent whether the parameter is must required
      * @param defValue    a default value returned if failed to parse value from the given object
+     * @param result      process result
      * @return valid result for the parameter value
      */
-    public static ProcessResult getBooleanParamValue(HttpServletRequest req,
-                                                     WebFieldDef fieldDef,
-                                                     boolean required,
-                                                     boolean defValue) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
-        }
-        String paramValue = (String) procResult.retData1;
+    public static boolean getBooleanParamValue(HttpServletRequest req,
+                                               WebFieldDef fieldDef,
+                                               boolean required,
+                                               boolean defValue,
+                                               ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef, required, null, result)) {
+            return result.success;
+        }
+        String paramValue = (String) result.retData1;
         if (paramValue == null) {
-            procResult.setSuccResult(defValue);
-            return procResult;
+            result.setSuccResult(defValue);
+            return result.success;
         }
-        procResult.setSuccResult(Boolean.parseBoolean(paramValue));
-        return procResult;
+        result.setSuccResult(Boolean.parseBoolean(paramValue));
+        return result.success;
     }
 
     /**
@@ -403,13 +405,14 @@ public class WebParameterUtils {
      * @param fieldDef    the parameter field definition
      * @param required     a boolean value represent whether the parameter is must required
      * @param defValue     a default value returned if failed to parse value from the given object
+     * @param result      process result
      * @return valid result for the parameter value
      */
-    public static ProcessResult getStringParamValue(HttpServletRequest req,
-                                                    WebFieldDef fieldDef,
-                                                    boolean required,
-                                                    String defValue) {
-        ProcessResult procResult = new ProcessResult();
+    public static boolean getStringParamValue(HttpServletRequest req,
+                                              WebFieldDef fieldDef,
+                                              boolean required,
+                                              String defValue,
+                                              ProcessResult result) {
         // get parameter value
         String paramValue = req.getParameter(fieldDef.name);
         if (paramValue == null) {
@@ -422,14 +425,14 @@ public class WebParameterUtils {
         // Check if the parameter exists
         if (TStringUtils.isBlank(paramValue)) {
             if (required) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append(" is missing or value is null or blank!").toString());
             } else {
-                procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+                procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
             }
-            return procResult;
+            return result.success;
         }
         // check if value is norm;
         if (fieldDef.isCompFieldType()) {
@@ -440,41 +443,41 @@ public class WebParameterUtils {
                 if (TStringUtils.isBlank(strParamValueItem)) {
                     continue;
                 }
-                if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) {
-                    return procResult;
+                if (!checkStrValueNorms(fieldDef, strParamValueItem, result)) {
+                    return result.success;
                 }
-                valItemSet.add((String) procResult.retData1);
+                valItemSet.add((String) result.retData1);
             }
             // check if is empty result
             if (valItemSet.isEmpty()) {
                 if (required) {
-                    procResult.setFailResult(fieldDef.id,
+                    result.setFailResult(fieldDef.id,
                             new StringBuilder(512).append("Parameter ")
                                     .append(fieldDef.name)
                                     .append(" is missing or value is null or blank!").toString());
                 } else {
-                    procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+                    procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
                 }
-                return procResult;
+                return result.success;
             }
             // check max item count
             if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) {
                 if (valItemSet.size() > fieldDef.itemMaxCnt) {
-                    procResult.setFailResult(fieldDef.id,
+                    result.setFailResult(fieldDef.id,
                             new StringBuilder(512).append("Parameter ")
                                     .append(fieldDef.name)
                                     .append("'s item count over max allowed count (")
                                     .append(fieldDef.itemMaxCnt).append(")!").toString());
                 }
             }
-            procResult.setSuccResult(valItemSet);
+            result.setSuccResult(valItemSet);
         } else {
-            if (!checkStrValueNorms(procResult, fieldDef, paramValue)) {
-                return procResult;
+            if (!checkStrValueNorms(fieldDef, paramValue, result)) {
+                return result.success;
             }
-            procResult.setSuccResult(paramValue);
+            result.setSuccResult(paramValue);
         }
-        return procResult;
+        return result.success;
     }
 
     /**
@@ -484,13 +487,14 @@ public class WebParameterUtils {
      * @param fieldDef    the parameter field definition
      * @param required    a boolean value represent whether the parameter is must required
      * @param defValue    a default value returned if failed to parse value from the given object
+     * @param result      process result
      * @return valid result for the parameter value
      */
-    public static ProcessResult getJsonDictParamValue(HttpServletRequest req,
-                                                      WebFieldDef fieldDef,
-                                                      boolean required,
-                                                      Map<String, Long> defValue) {
-        ProcessResult procResult = new ProcessResult();
+    public static boolean getJsonDictParamValue(HttpServletRequest req,
+                                                WebFieldDef fieldDef,
+                                                boolean required,
+                                                Map<String, Long> defValue,
+                                                ProcessResult result) {
         // get parameter value
         String paramValue = req.getParameter(fieldDef.name);
         if (paramValue == null) {
@@ -503,20 +507,20 @@ public class WebParameterUtils {
         // Check if the parameter exists
         if (TStringUtils.isBlank(paramValue)) {
             if (required) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append(" is missing or value is null or blank!").toString());
             } else {
-                procResult.setSuccResult(defValue);
+                result.setSuccResult(defValue);
             }
-            return procResult;
+            return result.success;
         }
         try {
             paramValue = URLDecoder.decode(paramValue,
                     TBaseConstants.META_DEFAULT_CHARSET_NAME);
         } catch (UnsupportedEncodingException e) {
-            procResult.setFailResult(fieldDef.id,
+            result.setFailResult(fieldDef.id,
                     new StringBuilder(512).append("Parameter ")
                             .append(fieldDef.name)
                             .append(" decode error, exception is ")
@@ -524,73 +528,83 @@ public class WebParameterUtils {
         }
         if (TStringUtils.isBlank(paramValue)) {
             if (required) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append("'s value is blank!").toString());
             } else {
-                procResult.setSuccResult(defValue);
+                result.setSuccResult(defValue);
             }
-            return procResult;
+            return result.success;
         }
         if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
             if (paramValue.length() > fieldDef.valMaxLen) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append("'s length over max allowed length (")
                                 .append(fieldDef.valMaxLen).append(")!").toString());
-                return procResult;
+                return result.success;
             }
         }
-        procResult.setSuccResult(new Gson().fromJson(paramValue,
-                new TypeToken<Map<String, Long>>(){}.getType()));
-        return procResult;
+        // parse data
+        try {
+            Map<String, Long> manOffsets = new Gson().fromJson(paramValue,
+                    new TypeToken<Map<String, Long>>(){}.getType());
+            result.setSuccResult(manOffsets);
+        } catch (Throwable e) {
+            result.setFailResult(fieldDef.id,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(fieldDef.name)
+                            .append(" value parse failure, error is ")
+                            .append(e.getMessage()).append("!").toString());
+        }
+        return result.success;
     }
 
     /**
      * process string default value
      *
-     * @param procResult process result
      * @param isCompFieldType   the parameter if compound field type
      * @param defValue   the parameter default value
+     * @param result process result
      * @return process result for default value of parameter
      */
-    private static ProcessResult procStringDefValue(ProcessResult procResult,
-                                                    boolean isCompFieldType,
-                                                    String defValue) {
+    private static boolean procStringDefValue(boolean isCompFieldType,
+                                              String defValue,
+                                              ProcessResult result) {
         if (isCompFieldType) {
             Set<String> valItemSet = new HashSet<>();
             if (TStringUtils.isNotBlank(defValue)) {
                 valItemSet.add(defValue);
             }
-            procResult.setSuccResult(valItemSet);
+            result.setSuccResult(valItemSet);
         } else {
-            procResult.setSuccResult(defValue);
+            result.setSuccResult(defValue);
         }
-        return procResult;
+        return result.success;
     }
 
     /**
      * Parse the parameter string value by regex define
      *
-     * @param procResult   process result
      * @param fieldDef     the parameter field definition
      * @param paramVal     the parameter value
+     * @param result       process result
      * @return check result for string value of parameter
      */
-    private static boolean checkStrValueNorms(ProcessResult procResult,
-                                              WebFieldDef fieldDef,
-                                              String paramVal) {
+    private static boolean checkStrValueNorms(WebFieldDef fieldDef,
+                                              String paramVal,
+                                              ProcessResult result) {
         paramVal = paramVal.trim();
         if (TStringUtils.isBlank(paramVal)) {
-            procResult.setSuccResult(null);
+            result.setSuccResult(null);
             return true;
         }
         // check value's max length
         if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
             if (paramVal.length() > fieldDef.valMaxLen) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("over max length for ")
                                 .append(fieldDef.name).append(", only allow ")
                                 .append(fieldDef.valMaxLen).append(" length").toString());
@@ -600,44 +614,44 @@ public class WebParameterUtils {
         // check value's pattern
         if (fieldDef.regexCheck) {
             if (!paramVal.matches(fieldDef.regexDef.getPattern())) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("illegal value for ")
                                 .append(fieldDef.name).append(", value ")
                                 .append(fieldDef.regexDef.getErrMsgTemp()).toString());
                 return false;
             }
         }
-        procResult.setSuccResult(paramVal);
+        result.setSuccResult(paramVal);
         return true;
     }
 
     /**
      * Parse the parameter string value by regex define
      *
-     * @param procResult   process result
      * @param fieldDef     the parameter field definition
      * @param paramValue   the parameter value
      * @param hasMinVal    whether there is a minimum
      * param minValue      the parameter min value
+     * @param result   process result
      * @return check result for string value of parameter
      */
-    private static boolean checkIntValueNorms(ProcessResult procResult,
-                                              WebFieldDef fieldDef,
+    private static boolean checkIntValueNorms(WebFieldDef fieldDef,
                                               String paramValue,
                                               boolean hasMinVal,
-                                              int minValue) {
+                                              int minValue,
+                                              ProcessResult result) {
         try {
             int paramIntVal = Integer.parseInt(paramValue);
             if (hasMinVal && paramIntVal < minValue) {
-                procResult.setFailResult(400,
+                result.setFailResult(400,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append(" value must >= ")
                                 .append(minValue).toString());
                 return false;
             }
-            procResult.setSuccResult(paramIntVal);
+            result.setSuccResult(paramIntVal);
         } catch (Throwable e) {
-            procResult.setFailResult(400,
+            result.setFailResult(400,
                     new StringBuilder(512).append("Parameter ")
                             .append(fieldDef.name).append(" parse error: ")
                             .append(e.getMessage()).toString());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 3bdfe16..8bef5ab 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -207,7 +207,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                         WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
                                 req.getParameter("memCacheMsgSizeInMB"),
                                 false, defmemCacheMsgSizeInMB, 2);
-                memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
+                memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
                 int memCacheFlushIntvl =
                         WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                                 req.getParameter("memCacheFlushIntvl"),
@@ -354,7 +354,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                             WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
                                     jsonObject.get("memCacheMsgSizeInMB"),
                                     false, brokerConfEntity.getDftMemCacheMsgSizeInMB(), 2);
-                    memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
+                    memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
                     int memCacheFlushIntvl =
                             WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                                     jsonObject.get("memCacheFlushIntvl"),
@@ -714,10 +714,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
      * @return
      */
     public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
         StringBuilder strBuffer = new StringBuilder(512);
-        ProcessResult result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.COMPSBROKERID, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, result)) {
             WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
             return strBuffer;
         }
@@ -752,17 +752,16 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
      * @return
      */
     public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
         StringBuilder strBuffer = new StringBuilder(512);
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
             return strBuffer;
         }
         Set<String> topicNameSet = (Set<String>) result.retData1;
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.WITHIP, false, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHIP, false, false, result)) {
             WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
             return strBuffer;
         }