You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/06 08:37:54 UTC

[incubator-tubemq] 01/02: [TUBEMQ-486]Add the delete API of consumer group offset

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 537ed34b39dfb007fea0d65773e2f1a509a1bd46
Author: gosonzhang <go...@tencent.com>
AuthorDate: Wed Jan 6 11:27:15 2021 +0800

    [TUBEMQ-486]Add the delete API of consumer group offset
---
 .../server/broker/offset/DefaultOffsetManager.java | 104 ++++-
 .../tubemq/server/broker/offset/OffsetService.java |   4 +
 .../server/broker/web/BrokerAdminServlet.java      | 430 +++++++++++++--------
 .../tubemq/server/common/fielddef/WebFieldDef.java |   6 +-
 .../tubemq/server/common/fileconfig/ZKConfig.java  |   1 -
 .../server/common/offsetstorage/OffsetStorage.java |   7 +-
 .../common/offsetstorage/ZkOffsetStorage.java      | 175 ++++++---
 .../common/offsetstorage/zookeeper/ZKUtil.java     |  14 +
 .../tubemq/server/common/utils/ProcessResult.java  |   3 +
 .../server/common/utils/WebParameterUtils.java     | 278 ++++++-------
 .../web/handler/WebBrokerTopicConfHandler.java     |  21 +-
 11 files changed, 661 insertions(+), 382 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 84dabb2..f052375 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.tubemq.server.broker.offset;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -339,7 +340,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         Set<String> groupSet = new HashSet<>();
         groupSet.addAll(cfmOffsetMap.keySet());
         Map<String, Set<String>> localGroups =
-                zkOffsetStorage.getZkLocalGroupTopicInfos();
+                zkOffsetStorage.queryZkAllGroupTopicInfos();
         groupSet.addAll(localGroups.keySet());
         return groupSet;
     }
@@ -364,7 +365,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     public Set<String> getUnusedGroupInfo() {
         Set<String> unUsedGroups = new HashSet<>();
         Map<String, Set<String>> localGroups =
-                zkOffsetStorage.getZkLocalGroupTopicInfos();
+                zkOffsetStorage.queryZkAllGroupTopicInfos();
         for (String groupName : localGroups.keySet()) {
             if (!cfmOffsetMap.containsKey(groupName)) {
                 unUsedGroups.add(groupName);
@@ -383,9 +384,11 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         Set<String> result = new HashSet<>();
         Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
         if (topicPartOffsetMap == null) {
-            Map<String, Set<String>> localGroups =
-                    zkOffsetStorage.getZkLocalGroupTopicInfos();
-            result = localGroups.get(group);
+            List<String> groupLst = new ArrayList<>(1);
+            groupLst.add(group);
+            Map<String, Set<String>> groupTopicInfo =
+                    zkOffsetStorage.queryZKGroupTopicInfo(groupLst);
+            result = groupTopicInfo.get(group);
         } else {
             for (OffsetStorageInfo storageInfo : topicPartOffsetMap.values()) {
                 result.add(storageInfo.getTopic());
@@ -496,6 +499,53 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     }
 
     /***
+     * Delete offset.
+     *
+     * @param onlyMemory
+     * @param groupTopicPartMap
+     * @param modifier
+     */
+    @Override
+    public void deleteGroupOffset(boolean onlyMemory,
+                                  Map<String, Map<String, Set<Integer>>> groupTopicPartMap,
+                                  String modifier) {
+        String printBase;
+        StringBuilder strBuidler = new StringBuilder(512);
+        for (Map.Entry<String, Map<String, Set<Integer>>> entry
+                : groupTopicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            rmvOffset(entry.getKey(), entry.getValue());
+        }
+        if (onlyMemory) {
+            printBase = strBuidler
+                    .append("[Offset Manager] delete offset from memory by modifier=")
+                    .append(modifier).toString();
+        } else {
+            zkOffsetStorage.deleteGroupOffsetInfo(groupTopicPartMap);
+            printBase = strBuidler
+                    .append("[Offset Manager] delete offset from memory and zk by modifier=")
+                    .append(modifier).toString();
+        }
+        strBuidler.delete(0, strBuidler.length());
+        // print log
+        for (Map.Entry<String, Map<String, Set<Integer>>> entry
+                : groupTopicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            logger.info(strBuidler.append(printBase).append(",group=").append(entry.getKey())
+                    .append(",topic-partId-map=").append(entry.getValue()).toString());
+            strBuidler.delete(0, strBuidler.length());
+        }
+    }
+
+    /***
      * Set temp offset.
      *
      * @param group
@@ -611,6 +661,50 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         return regInfo;
     }
 
+    private void rmvOffset(String group, Map<String, Set<Integer>> topicPartMap) {
+        if (group == null
+                || topicPartMap == null
+                || topicPartMap.isEmpty()) {
+            return;
+        }
+        // remove confirm offset
+        ConcurrentHashMap<String, OffsetStorageInfo> regInfoMap = cfmOffsetMap.get(group);
+        if (regInfoMap != null) {
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                if (entry.getKey() == null
+                        || entry.getValue() == null
+                        || entry.getValue().isEmpty()) {
+                    continue;
+                }
+                for (Integer partitionId : entry.getValue()) {
+                    String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId);
+                    regInfoMap.remove(offsetCacheKey);
+                }
+            }
+            if (regInfoMap.isEmpty()) {
+                cfmOffsetMap.remove(group);
+            }
+        }
+        // remove tmp offset
+        ConcurrentHashMap<String, Long> tmpRegInfoMap = tmpOffsetMap.get(group);
+        if (tmpRegInfoMap != null) {
+            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                if (entry.getKey() == null
+                        || entry.getValue() == null
+                        || entry.getValue().isEmpty()) {
+                    continue;
+                }
+                for (Integer partitionId : entry.getValue()) {
+                    String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId);
+                    tmpRegInfoMap.remove(offsetCacheKey);
+                }
+            }
+            if (tmpRegInfoMap.isEmpty()) {
+                tmpOffsetMap.remove(group);
+            }
+        }
+    }
+
     private String getOffsetCacheKey(String topic, int partitionId) {
         return new StringBuilder(256).append(topic)
                 .append("-").append(partitionId).toString();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 9dcd29a..4a19798 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -72,4 +72,8 @@ public interface OffsetService {
     boolean modifyGroupOffset(Set<String> groups,
                               List<Tuple3<String, Integer, Long>> topicPartOffsets,
                               String modifier);
+
+    void deleteGroupOffset(boolean onlyMemory,
+                           Map<String, Map<String, Set<Integer>>> groupTopicPartMap,
+                           String modifier);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index c76a6b7..1f2fb5d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -95,10 +95,13 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // set or update group's offset info
         innRegisterWebMethod("admin_set_offset",
                 "adminSetGroupOffSet");
+        // remove group's offset info
+        innRegisterWebMethod("admin_rmv_offset",
+                "adminRemoveGroupOffSet");
     }
 
     public void adminQueryAllMethods(HttpServletRequest req,
-                                     StringBuilder sBuilder) throws Exception {
+                                     StringBuilder sBuilder) {
         int index = 0;
         List<String> methods = getSupportedMethod();
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
@@ -120,11 +123,11 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminQueryBrokerAllConsumerInfo(HttpServletRequest req,
-                                                StringBuilder sBuilder) throws Exception {
+                                                StringBuilder sBuilder) {
         int index = 0;
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, false, null);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -209,10 +212,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req,
-                                                    StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+                                                    StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -276,17 +279,16 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminGetMemStoreStatisInfo(HttpServletRequest req,
-                                           StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+                                           StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         Set<String> topicNameSet = (Set<String>) result.retData1;
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.NEEDREFRESH, false, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.NEEDREFRESH, false, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -335,38 +337,34 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      * @throws Exception
      */
     public void adminManualSetCurrentOffSet(HttpServletRequest req,
-                                            StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TOPICNAME, true, null);
-        if (!result.success) {
+                                            StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String topicName = (String) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.GROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String groupName = (String) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.MODIFYUSER, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String modifyUser = (String) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.PARTITIONID, true, -1, 0);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         int partitionId = (Integer) result.retData1;
-        result = WebParameterUtils.getLongParamValue(req,
-                WebFieldDef.MANUALOFFSET, true, -1);
-        if (!result.success) {
+        if (!WebParameterUtils.getLongParamValue(req,
+                WebFieldDef.MANUALOFFSET, true, -1, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -429,23 +427,21 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQuerySnapshotMessageSet(HttpServletRequest req,
                                              StringBuilder sBuilder) throws Exception {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TOPICNAME, true, null);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String topicName = (String) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.PARTITIONID, true, -1, 0);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         int partitionId = (Integer) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.MSGCOUNT, false, 3, 3);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.MSGCOUNT, false, 3, 3, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -457,9 +453,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                     .append("\"}");
             return;
         }
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.FILTERCONDS, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.FILTERCONDS, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -477,31 +472,27 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryCurrentGroupOffSet(HttpServletRequest req,
                                              StringBuilder sBuilder) {
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TOPICNAME, true, null);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TOPICNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String topicName = (String) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.GROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String groupName = (String) result.retData1;
-        result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.PARTITIONID, true, -1, 0);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.PARTITIONID, true, -1, 0, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         int partitionId = (Integer) result.retData1;
-
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.REQUIREREALOFFSET, false, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.REQUIREREALOFFSET, false, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -567,8 +558,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         Map<String, ConsumerNodeInfo> map =
                 broker.getBrokerServiceServer().getConsumerRegisterMap();
         int totalCnt = 0;
-        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",")
-                .append(",\"dataSet\":[");
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
         for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -592,10 +582,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryPubInfo(HttpServletRequest req,
                                   StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get the topic set to be queried
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -637,9 +627,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     public void adminQueryBookedGroup(HttpServletRequest req,
                                       StringBuilder sBuilder) {
         // get divide info
-        ProcessResult result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.WITHDIVIDE, false, false);
-        if (!result.success) {
+        ProcessResult result = new ProcessResult();
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHDIVIDE, false, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -697,16 +687,24 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminQueryGroupOffSet(HttpServletRequest req,
                                       StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get group list
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
+        Set<String> inGroupNameSet = (Set<String>) result.retData1;
+        // get the topic set to be queried
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> topicSet = (Set<String>) result.retData1;
         // filter invalid groups
         Set<String> qryGroupNameSet = new HashSet<>();
-        Set<String> inGroupNameSet = (Set<String>) result.retData1;
         Set<String> bookedGroupSet = broker.getOffsetManager().getBookedGroups();
         if (inGroupNameSet.isEmpty()) {
             qryGroupNameSet = bookedGroupSet;
@@ -717,19 +715,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                 }
             }
         }
-        // get the topic set to be queried
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
-            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
-            return;
-        }
-        // get target consume group name
-        Set<String> topicSet = (Set<String>) result.retData1;
         // verify the acquired Topic set and
         //   query the corresponding offset information
         Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps =
-                getGroupOffsetInfo(qryGroupNameSet, topicSet);
+                getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, qryGroupNameSet, topicSet);
         // builder result
         int totalCnt = 0;
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
@@ -772,26 +761,24 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminSetGroupOffSet(HttpServletRequest req,
                                     StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get group list
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSGROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         Set<String> groupNameSet = (Set<String>) result.retData1;
         // get set mode
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.MANUALSET, true, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.MANUALSET, true, false, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         boolean manualSet = (Boolean) result.retData1;
         // get modify user
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.MODIFYUSER, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
@@ -799,17 +786,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         final String modifier = (String) result.retData1;
         if (manualSet) {
             // get offset json info
-            result = WebParameterUtils.getJsonDictParamValue(req,
-                    WebFieldDef.OFFSETJSON, true, null);
-            if (!result.success) {
+            if (!WebParameterUtils.getJsonDictParamValue(req,
+                    WebFieldDef.OFFSETJSON, true, null, result)) {
                 WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
                 return;
             }
             Map<String, Long> manOffsets =
                     (Map<String, Long>) result.retData1;
             // valid and transfer offset format
-            result = validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets);
-            if (!result.success) {
+            if (!validManOffsetResetInfo(WebFieldDef.OFFSETJSON, manOffsets, result)) {
                 WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
                 return;
             }
@@ -817,9 +802,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                     (List<Tuple3<String, Integer, Long>>) result.retData1;
         } else {
             // get the topic set to be set
-            result = WebParameterUtils.getStringParamValue(req,
-                    WebFieldDef.COMPSTOPICNAME, true, null);
-            if (!result.success) {
+            if (!WebParameterUtils.getStringParamValue(req,
+                    WebFieldDef.COMPSTOPICNAME, true, null, result)) {
                 WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
                 return;
             }
@@ -840,38 +824,43 @@ public class BrokerAdminServlet extends AbstractWebHandler {
      */
     public void adminCloneGroupOffSet(HttpServletRequest req,
                                       StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
         // get source consume group name
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.SRCGROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.SRCGROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         final String srcGroupName = (String) result.retData1;
-        // get modify user
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.MODIFYUSER, true, null);
-        if (!result.success) {
+        // get source consume group's topic set cloned to target group
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
-        final String modifier = (String) result.retData1;
-        // get source consume group's topic set cloned to target group
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+        Set<String> srcTopicNameSet = (Set<String>) result.retData1;
+        // valid topic and get topic's partitionIds
+        if (!validAndGetTopicPartInfo(srcGroupName,
+                WebFieldDef.SRCGROUPNAME, srcTopicNameSet, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
+        Map<String, Set<Integer>> topicPartMap =
+                (Map<String, Set<Integer>>) result.retData1;
         // get target consume group name
-        Set<String> srcTopicNameSet = (Set<String>) result.retData1;
-        result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.TGTCOMPSGROUPNAME, true, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.TGTCOMPSGROUPNAME, true, null, result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return;
         }
         Set<String> tgtGroupNameSet = (Set<String>) result.retData1;
+        // get modify user
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        final String modifier = (String) result.retData1;
         // check sourceGroup if existed
         Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
         if (!bookedGroups.contains(srcGroupName)) {
@@ -882,16 +871,6 @@ public class BrokerAdminServlet extends AbstractWebHandler {
                             .append(" has not been registered on this Broker!").toString());
             return;
         }
-        // valid topic and get topic's partitionIds
-        Map<String, Set<Integer>> topicPartMap =
-                validAndGetPartitions(srcGroupName, srcTopicNameSet);
-        if (topicPartMap.isEmpty()) {
-            WebParameterUtils.buildFailResult(sBuilder,
-                    new StringBuilder(512).append("Parameter ")
-                            .append(WebFieldDef.SRCGROUPNAME.name).append(": not found ")
-                            .append(srcGroupName).append(" subscribed topic set!").toString());
-            return;
-        }
         // query offset from source group
         Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
                 broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
@@ -904,6 +883,56 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
     }
 
+    /***
+     * Remove consume group offset.
+     *
+     * @param req
+     * @param sBuilder process result
+     */
+    public void adminRemoveGroupOffSet(HttpServletRequest req,
+                                      StringBuilder sBuilder) {
+        ProcessResult result = new ProcessResult();
+        // get consume group name
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSGROUPNAME, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        Set<String> groupNameSet = (Set<String>) result.retData1;
+        // get modify user
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        final String modifier = (String) result.retData1;
+        // get need removed offset's topic
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        // get target consume group name
+        Set<String> topicNameSet = (Set<String>) result.retData1;
+        // get set mode
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.ONLYMEM, false, false, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        boolean onlyMemory = (Boolean) result.retData1;
+        if (!validAndGetGroupTopicInfo(groupNameSet, topicNameSet, result)) {
+            WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+            return;
+        }
+        Map<String, Map<String, Set<Integer>>> groupTopicPartMap =
+                (Map<String, Map<String, Set<Integer>>>) result.retData1;
+        broker.getOffsetManager().deleteGroupOffset(
+                onlyMemory, groupTopicPartMap, modifier);
+        // builder return result
+        sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+    }
+
     // build reset offset info
     private List<Tuple3<String, Integer, Long>> buildOffsetResetInfo(
             Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap) {
@@ -950,8 +979,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         List<Tuple3<String, Integer, Long>> result = new ArrayList<>();
         MessageStoreManager storeManager = broker.getStoreManager();
         // get topic's partition set
-        Map<String, Set<Integer>> topicPartMap =
-                validAndGetPartitions(null, topicSet);
+        Map<String, Set<Integer>> topicPartMap = getTopicPartitions(topicSet);
         // fill current topic's max offset value
         for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
             if (entry.getKey() == null
@@ -979,15 +1007,15 @@ public class BrokerAdminServlet extends AbstractWebHandler {
     }
 
     // build reset offset info
-    private ProcessResult validManOffsetResetInfo(WebFieldDef fieldDef,
-                                                  Map<String, Long> manOffsetInfoMap) {
+    private boolean validManOffsetResetInfo(WebFieldDef fieldDef,
+                                            Map<String, Long> manOffsetInfoMap,
+                                            ProcessResult result) {
         String brokerId;
         String topicName;
         String strPartId;
         int partitionId;
         long adjOffset;
         MessageStore store = null;
-        ProcessResult procResult = new ProcessResult();
         MessageStoreManager storeManager = broker.getStoreManager();
         List<Tuple3<String, Integer, Long>> offsetVals = new ArrayList<>();
         String localBrokerId = String.valueOf(broker.getTubeConfig().getBrokerId());
@@ -1001,12 +1029,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             // parse and check partitionKey value
             String[] keyItems = entry.getKey().split(TokenConstants.ATTR_SEP);
             if (keyItems.length != 3) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append("'s key invalid:")
                                 .append(entry.getKey())
                                 .append(" must be brokerId:topicName:partitionId !").toString());
-                return procResult;
+                return result.success;
             }
             brokerId = keyItems[0].trim();
             topicName = keyItems[1].trim();
@@ -1018,12 +1046,12 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             try {
                 partitionId = Integer.parseInt(strPartId);
             } catch (NumberFormatException e) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append("'s key invalid:")
                                 .append(entry.getKey())
                                 .append("'s partitionId value not number!").toString());
-                return procResult;
+                return result.success;
             }
             // check and adjust offset value
             try {
@@ -1041,65 +1069,129 @@ public class BrokerAdminServlet extends AbstractWebHandler {
             offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset));
         }
         if (offsetVals.isEmpty()) {
-            procResult.setFailResult(fieldDef.id,
+            result.setFailResult(fieldDef.id,
                     new StringBuilder(512).append("Parameter ")
-                            .append(fieldDef.name)
-                            .append("'s value is invalid!").toString());
+                            .append(fieldDef.name).append("'s value is invalid!").toString());
         } else {
-            procResult.setSuccResult(offsetVals);
+            result.setSuccResult(offsetVals);
         }
-        return procResult;
+        return result.success;
     }
 
     // builder group's offset info
     private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
-            Set<String> groupSet, Set<String> topicSet) {
-        long curReadDataOffset = -2;
-        long curDataLag = -2;
+            WebFieldDef groupFldDef, Set<String> groupSet, Set<String> topicSet) {
+        ProcessResult result = new ProcessResult();
         Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>();
         for (String group : groupSet) {
             Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>();
             // valid and get topic's partitionIds
-            Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet);
-            // get topic's publish info
-            Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
-                    broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
-            // get group's booked offset info
-            Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
-                    broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
-            // get offset info array
-            for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
-                String topic = entry.getKey();
-                Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
-                Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
-                Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
-                for (Integer partitionId : entry.getValue()) {
-                    GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
-                    offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId));
-                    offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId));
-                    String queryKey = buildQueryID(group, topic, partitionId);
-                    ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
-                    if (nodeInfo != null) {
-                        offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+            if (validAndGetTopicPartInfo(group, groupFldDef, topicSet, result)) {
+                Map<String, Set<Integer>> topicPartMap =
+                        (Map<String, Set<Integer>>) result.retData1;
+                // get topic's publish info
+                Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+                        broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
+                // get group's booked offset info
+                Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
+                        broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+                // get offset info array
+                for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+                    String topic = entry.getKey();
+                    Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
+                    Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
+                    Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
+                    for (Integer partitionId : entry.getValue()) {
+                        GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
+                        offsetInfo.setPartPubStoreInfo(
+                                storeInfoMap == null ? null :storeInfoMap.get(partitionId));
+                        offsetInfo.setConsumeOffsetInfo(
+                                partBookedMap == null ? null : partBookedMap.get(partitionId));
+                        String queryKey = buildQueryID(group, topic, partitionId);
+                        ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
+                        if (nodeInfo != null) {
+                            offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+                        }
+                        offsetInfo.calculateLag();
+                        partOffsetRet.put(partitionId, offsetInfo);
                     }
-                    offsetInfo.calculateLag();
-                    partOffsetRet.put(partitionId, offsetInfo);
+                    topicOffsetRet.put(topic, partOffsetRet);
                 }
-                topicOffsetRet.put(topic, partOffsetRet);
             }
             groupOffsetMaps.put(group, topicOffsetRet);
         }
         return groupOffsetMaps;
     }
 
+    // valid and get need removed group-topic info
+    private boolean validAndGetGroupTopicInfo(Set<String> groupSet,
+                                              Set<String> topicSet,
+                                              ProcessResult result) {
+        Map<String, Map<String, Set<Integer>>> groupTopicPartMap = new HashMap<>();
+        // filter group
+        Set<String> targetGroupSet = new HashSet<>();
+        Set<String> bookedGroups = broker.getOffsetManager().getBookedGroups();
+        for (String orgGroup : groupSet) {
+            if (bookedGroups.contains(orgGroup)) {
+                targetGroupSet.add(orgGroup);
+            }
+        }
+        // valid specified topic set
+        for (String group : targetGroupSet) {
+            if (validAndGetTopicPartInfo(group, WebFieldDef.GROUPNAME, topicSet, result)) {
+                Map<String, Set<Integer>> topicPartMap =
+                        (Map<String, Set<Integer>>) result.retData1;
+                groupTopicPartMap.put(group, topicPartMap);
+            }
+        }
+        result.setSuccResult(groupTopicPartMap);
+        return true;
+    }
 
-    private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
-        Map<String, Set<Integer>> topicPartMap = new HashMap<>();
-        // query stored topic set stored in memory or zk
-        if (topicSet.isEmpty() && group != null) {
-            topicSet = broker.getOffsetManager().getGroupSubInfo(group);
+    private boolean validAndGetTopicPartInfo(String groupName,
+                                             WebFieldDef groupFldDef,
+                                             Set<String> topicSet,
+                                             ProcessResult result) {
+        Set<String> subTopicSet =
+                broker.getOffsetManager().getGroupSubInfo(groupName);
+        if (subTopicSet == null || subTopicSet.isEmpty()) {
+            result.setFailResult(400, new StringBuilder(512)
+                    .append("Parameter ").append(groupFldDef.name)
+                    .append(": subscribed topic set of ").append(groupName)
+                    .append(" query result is null!").toString());
+            return result.success;
         }
-        // get topic's partitionIds
+        // filter valid topic set
+        Set<String> tgtTopicSet = new HashSet<>();
+        if (topicSet.isEmpty()) {
+            tgtTopicSet = subTopicSet;
+        } else {
+            for (String topic : topicSet) {
+                if (subTopicSet.contains(topic)) {
+                    tgtTopicSet.add(topic);
+                }
+            }
+            if (tgtTopicSet.isEmpty()) {
+                result.setFailResult(400, new StringBuilder(512)
+                        .append("Parameter ").append(groupFldDef.name)
+                        .append(": ").append(groupName)
+                        .append(" unsubscribed to the specified topic set!").toString());
+                return result.success;
+            }
+        }
+        Map<String, Set<Integer>> topicPartMap = getTopicPartitions(tgtTopicSet);
+        if (topicPartMap.isEmpty()) {
+            result.setFailResult(400, new StringBuilder(512)
+                    .append("Parameter ").append(groupFldDef.name)
+                    .append(": all topics subscribed by the group have been deleted!").toString());
+            return result.success;
+        }
+        result.setSuccResult(topicPartMap);
+        return result.success;
+    }
+
+    private Map<String, Set<Integer>> getTopicPartitions(Set<String> topicSet) {
+        Map<String, Set<Integer>> topicPartMap = new HashMap<>();
         if (topicSet != null) {
             Map<String, TopicMetadata> topicConfigMap =
                     broker.getMetadataManager().getTopicConfigMap();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 45b862d..a65a223 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -81,8 +81,10 @@ public enum WebFieldDef {
             TBaseConstants.META_MAX_GROUPNAME_LENGTH, RegexDef.TMP_GROUP),
     MANUALSET(20, "manualSet", "manSet",
             WebFieldType.BOOLEAN, "Whether manual offset setting mode"),
-    OFFSETJSON(21, "offsetJsonSet", "offsetSet",
-            WebFieldType.JSONTYPE, "The offset set that needs to be added or modified");
+    OFFSETJSON(21, "offsetJsonInfo", "offsetInfo",
+            WebFieldType.JSONTYPE, "The offset info that needs to be added or modified"),
+    ONLYMEM(22, "onlyMemory", "onlyMem", WebFieldType.BOOLEAN,
+            "Only clear the offset data in the memory cache, default is false");
 
 
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
index 5ac3683..8e677a5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fileconfig/ZKConfig.java
@@ -28,7 +28,6 @@ public class ZKConfig {
     private int zkSyncTimeMs = 1000;
     private long zkCommitPeriodMs = 5000L;
     private int zkCommitFailRetries = TServerConstants.CFG_ZK_COMMIT_DEFAULT_RETRIES;
-
     public ZKConfig() {
 
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
index dca7ca8..f5eee40 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/OffsetStorage.java
@@ -18,6 +18,7 @@
 package org.apache.tubemq.server.common.offsetstorage;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,10 +34,12 @@ public interface OffsetStorage {
                       final Collection<OffsetStorageInfo> offsetInfoList,
                       boolean isFailRetry);
 
-    Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos();
+    Map<String, Set<String>> queryZkAllGroupTopicInfos();
 
-    Map<String, Set<String>> getZkLocalGroupTopicInfos();
+    Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet);
 
     Map<Integer, Long> queryGroupOffsetInfo(String group, String topic,
                                             Set<Integer> partitionIds);
+
+    void deleteGroupOffsetInfo(Map<String, Map<String, Set<Integer>>> groupTopicPartMap);
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index 8094151..3700753 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -35,6 +35,8 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /**
  * A offset storage implementation with zookeeper
  */
@@ -64,18 +66,16 @@ public class ZkOffsetStorage implements OffsetStorage {
     private final String consumerZkDir;
     private final boolean isBroker;
     private final int brokerId;
+    private final String strBrokerId;
     private ZKConfig zkConfig;
     private ZooKeeperWatcher zkw;
-    // group-topic-brokerid
-    private final Map<String, Map<String, Set<String>>> zkGroupTopicBrokerInfos = new HashMap<>();
-    // group-topic
-    private final Map<String, Set<String>> zkLocalGroupTopicInfos = new HashMap<>();
 
 
     public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
         this.zkConfig = zkConfig;
-        this.brokerId = brokerId;
         this.isBroker = isBroker;
+        this.brokerId = brokerId;
+        this.strBrokerId = String.valueOf(brokerId);
         this.tubeZkRoot = normalize(this.zkConfig.getZkNodeRoot());
         this.consumerZkDir = this.tubeZkRoot + "/consumers-v3";
         try {
@@ -86,8 +86,6 @@ public class ZkOffsetStorage implements OffsetStorage {
                     .append(this.zkConfig.getZkServerAddr()).append(") !").toString(), e);
             System.exit(1);
         }
-        logger.info("[ZkOffsetStorage] Get group-topic-broker info from ZooKeeper");
-        queryAllZKGroupTopicInfo();
         logger.info("[ZkOffsetStorage] ZooKeeper Offset Storage initiated!");
     }
 
@@ -102,16 +100,6 @@ public class ZkOffsetStorage implements OffsetStorage {
     }
 
     @Override
-    public Map<String, Map<String, Set<String>>> getZkGroupTopicBrokerInfos() {
-        return zkGroupTopicBrokerInfos;
-    }
-
-    @Override
-    public Map<String, Set<String>> getZkLocalGroupTopicInfos() {
-        return zkLocalGroupTopicInfos;
-    }
-
-    @Override
     public void commitOffset(final String group,
                              final Collection<OffsetStorageInfo> offsetInfoList,
                              boolean isFailRetry) {
@@ -242,63 +230,130 @@ public class ZkOffsetStorage implements OffsetStorage {
     }
 
     /**
-     * Get group-topic-brokerid map info stored in zookeeper.
-     * <p/>
-     * The broker only cares about the content of its own node,
-     * so this part only queries when the node starts, and
-     * caches relevant data in the memory for finding
-     *
+     * Query booked topic info of groups stored in zookeeper.
+     * @param groupSet query groups
+     * @return group--topic map info
      */
-    private void queryAllZKGroupTopicInfo() {
+    @Override
+    public Map<String, Set<String>> queryZKGroupTopicInfo(List<String> groupSet) {
+        String qryBrokerId;
+        Map<String, Set<String>> groupTopicMap = new HashMap<>();
         StringBuilder sBuider = new StringBuilder(512);
-        // get all booked groups name
+        if (groupSet == null || groupSet.isEmpty()) {
+            return groupTopicMap;
+        }
+        // build path base
         String groupNode = sBuider.append(this.consumerZkDir).toString();
-        List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
         sBuider.delete(0, sBuider.length());
-        if (bookedGroups != null) {
-            // get topic info by group
-            for (String group : bookedGroups) {
-                String topicNode = sBuider.append(groupNode)
-                        .append("/").append(group).append("/offsets").toString();
-                List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
-                sBuider.delete(0, sBuider.length());
-                Set<String> topicSet = new HashSet<>();
-                Map<String, Set<String>> topicBrokerSet = new HashMap<>();
-                if (consumeTopics != null) {
-                    // get broker info by topic
-                    for (String topic : consumeTopics) {
-                        String brokerNode = sBuider.append(topicNode)
-                                .append("/").append(topic).toString();
-                        List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
-                        sBuider.delete(0, sBuider.length());
-                        Set<String> brokerIdSet = new HashSet<>();
-                        if (brokerIds != null) {
-                            for (String idStr : brokerIds) {
-                                if (idStr != null) {
-                                    String[] brokerPartIdStrs =
-                                            idStr.split(TokenConstants.HYPHEN);
-                                    brokerIdSet.add(brokerPartIdStrs[0]);
+        // get the group managed by this broker
+        for (String group : groupSet) {
+            String topicNode = sBuider.append(groupNode)
+                    .append("/").append(group).append("/offsets").toString();
+            List<String> consumeTopics = ZKUtil.getChildren(this.zkw, topicNode);
+            sBuider.delete(0, sBuider.length());
+            Set<String> topicSet = new HashSet<>();
+            if (consumeTopics != null) {
+                for (String topic : consumeTopics) {
+                    if (topic == null) {
+                        continue;
+                    }
+                    String brokerNode = sBuider.append(topicNode)
+                            .append("/").append(topic).toString();
+                    List<String> brokerIds = ZKUtil.getChildren(this.zkw, brokerNode);
+                    sBuider.delete(0, sBuider.length());
+                    if (brokerIds != null) {
+                        for (String idStr : brokerIds) {
+                            if (idStr != null) {
+                                String[] brokerPartIdStrs =
+                                        idStr.split(TokenConstants.HYPHEN);
+                                qryBrokerId = brokerPartIdStrs[0];
+                                if (qryBrokerId != null
+                                        && strBrokerId.equals(qryBrokerId.trim())) {
+                                    topicSet.add(topic);
+                                    break;
                                 }
                             }
-                            if (isBroker && brokerIdSet.contains(String.valueOf(brokerId))) {
-                                topicSet.add(topic);
-                            }
                         }
-                        topicBrokerSet.put(topic, brokerIdSet);
                     }
                 }
-                if (!topicSet.isEmpty()) {
-                    zkLocalGroupTopicInfos.put(group, topicSet);
+            }
+            if (!topicSet.isEmpty()) {
+                groupTopicMap.put(group, topicSet);
+            }
+        }
+        return groupTopicMap;
+    }
+
+    /**
+     * Get group-topic map info stored in zookeeper.
+     * <p/>
+     * The broker only cares about the content of its own node
+     *
+     */
+    @Override
+    public Map<String, Set<String>> queryZkAllGroupTopicInfos() {
+        StringBuilder sBuider = new StringBuilder(512);
+        // get all booked groups name
+        String groupNode = sBuider.append(this.consumerZkDir).toString();
+        List<String> bookedGroups = ZKUtil.getChildren(this.zkw, groupNode);
+        return queryZKGroupTopicInfo(bookedGroups);
+    }
+
+    /**
+     * Get offset stored in zookeeper, if not found or error, set null
+     * <p/>
+     *
+     * @return partitionId--offset map info
+     */
+    @Override
+    public void deleteGroupOffsetInfo(
+            Map<String, Map<String, Set<Integer>>> groupTopicPartMap) {
+        StringBuilder sBuider = new StringBuilder(512);
+        for (Map.Entry<String, Map<String, Set<Integer>>> entry
+                : groupTopicPartMap.entrySet()) {
+            if (entry.getKey() == null
+                    || entry.getValue() == null
+                    || entry.getValue().isEmpty()) {
+                continue;
+            }
+            String basePath = sBuider.append(this.consumerZkDir).append("/")
+                    .append(entry.getKey()).append("/offsets").toString();
+            sBuider.delete(0, sBuider.length());
+            Map<String, Set<Integer>> topicPartMap = entry.getValue();
+            for (Map.Entry<String, Set<Integer>> topicEntry : topicPartMap.entrySet()) {
+                if (topicEntry.getKey() == null
+                        || topicEntry.getValue() == null
+                        || topicEntry.getValue().isEmpty()) {
+                    continue;
+                }
+                Set<Integer> partIdSet = topicEntry.getValue();
+                for (Integer partitionId : partIdSet) {
+                    String offsetNode = sBuider.append(basePath).append("/")
+                            .append(topicEntry.getKey()).append("/")
+                            .append(brokerId).append(TokenConstants.HYPHEN)
+                            .append(partitionId).toString();
+                    sBuider.delete(0, sBuider.length());
+                    ZKUtil.delZNode(this.zkw, offsetNode);
                 }
-                zkGroupTopicBrokerInfos.put(group, topicBrokerSet);
+                String parentNode = sBuider.append(basePath).append("/")
+                        .append(topicEntry.getKey()).toString();
+                sBuider.delete(0, sBuider.length());
+                chkAndRmvBlankParentNode(parentNode);
             }
+            chkAndRmvBlankParentNode(basePath);
+            String parentNode = sBuider.append(this.consumerZkDir)
+                    .append("/").append(entry.getKey()).toString();
+            sBuider.delete(0, sBuider.length());
+            chkAndRmvBlankParentNode(parentNode);
         }
-        logger.info(new StringBuilder(256)
-                .append("[ZkOffsetStorage] query from zookeeper, total group size = ")
-                .append(zkGroupTopicBrokerInfos.size()).append(", local group size = ")
-                .append(zkLocalGroupTopicInfos.size()).toString());
     }
 
+    private void chkAndRmvBlankParentNode(String parentNode) {
+        List<String> nodeSet = ZKUtil.getChildren(zkw, parentNode);
+        if (nodeSet != null && nodeSet.isEmpty()) {
+            ZKUtil.delZNode(this.zkw, parentNode);
+        }
+    }
 
     private String normalize(final String root) {
         if (root.startsWith("/")) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
index da86191..5eb6cea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZKUtil.java
@@ -167,6 +167,20 @@ public class ZKUtil {
         }
     }
 
+    /**
+     * delete the specified znode.
+     *
+     * @param zkw   zk reference
+     * @param znode path of node
+     */
+    public static void delZNode(ZooKeeperWatcher zkw, String znode) {
+        try {
+            zkw.getRecoverableZooKeeper().delete(znode, -1);
+        } catch (Throwable e) {
+            //
+        }
+    }
+
     private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
                                           boolean watcherSet) throws KeeperException {
         try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
index 3688b1c..8cabf67 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java
@@ -44,10 +44,13 @@ public class ProcessResult {
         this.success = false;
         this.errCode = errCode;
         this.errInfo = errMsg;
+        this.retData1 = null;
     }
 
     public void setSuccResult(Object retData) {
         this.success = true;
+        this.errInfo = "";
+        this.errCode = TErrCodeConstants.SUCCESS;
         this.retData1 = retData;
     }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index fddd5de..2318bdb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -249,32 +249,32 @@ public class WebParameterUtils {
      * @param fieldDef   the parameter field definition
      * @param required   a boolean value represent whether the parameter is must required
      * @param defValue   a default value returned if failed to parse value from the given object
-     * @return valid result for the parameter value
+     * @param result     process result of parameter value
+     * @return process result
      */
-    public static ProcessResult getLongParamValue(HttpServletRequest req,
-                                                  WebFieldDef fieldDef,
-                                                  boolean required,
-                                                  long defValue) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
-        }
-        String paramValue = (String) procResult.retData1;
+    public static boolean getLongParamValue(HttpServletRequest req,
+                                            WebFieldDef fieldDef,
+                                            boolean required,
+                                            long defValue,
+                                            ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef, required, null, result)) {
+            return result.success;
+        }
+        String paramValue = (String) result.retData1;
         if (paramValue == null) {
-            procResult.setSuccResult(defValue);
-            return procResult;
+            result.setSuccResult(defValue);
+            return result.success;
         }
         try {
             long paramIntVal = Long.parseLong(paramValue);
-            procResult.setSuccResult(paramIntVal);
+            result.setSuccResult(paramIntVal);
         } catch (Throwable e) {
-            procResult.setFailResult(400,
+            result.setFailResult(400,
                     new StringBuilder(512).append("Parameter ")
                             .append(fieldDef.name).append(" parse error: ")
                             .append(e.getMessage()).toString());
         }
-        return procResult;
+        return result.success;
     }
 
     /**
@@ -283,43 +283,44 @@ public class WebParameterUtils {
      * @param req        Http Servlet Request
      * @param fieldDef   the parameter field definition
      * @param required   a boolean value represent whether the parameter is must required
-     * @return valid result for the parameter value
+     * @param result     process result of parameter value
+     * @return process result
      */
-    public static ProcessResult getIntParamValue(HttpServletRequest req,
-                                                 WebFieldDef fieldDef,
-                                                 boolean required) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
+    public static boolean getIntParamValue(HttpServletRequest req,
+                                           WebFieldDef fieldDef,
+                                           boolean required,
+                                           ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef,
+                required, null, result)) {
+            return result.success;
         }
-        ProcessResult procRet = new ProcessResult();
         Set<Integer> tgtValueSet = new HashSet<Integer>();
         if (fieldDef.isCompFieldType()) {
-            Set<String> valItemSet = (Set<String>) procResult.retData1;
+            Set<String> valItemSet = (Set<String>) result.retData1;
             if (valItemSet.isEmpty()) {
-                procResult.setSuccResult(tgtValueSet);
-                return procResult;
+                result.setSuccResult(tgtValueSet);
+                return result.success;
             }
             for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) {
-                    return procRet;
+                if (!checkIntValueNorms(fieldDef,
+                        itemVal, false, -1, result)) {
+                    return result.success;
                 }
-                tgtValueSet.add((Integer) procRet.retData1);
+                tgtValueSet.add((Integer) result.retData1);
             }
         } else {
-            String paramValue = (String) procResult.retData1;
+            String paramValue = (String) result.retData1;
             if (paramValue == null) {
-                procResult.setSuccResult(tgtValueSet);
-                return procResult;
+                result.setSuccResult(tgtValueSet);
+                return result.success;
             }
-            if (!checkIntValueNorms(procRet,
-                    fieldDef, paramValue, false, -1)) {
-                tgtValueSet.add((Integer) procRet.retData1);
+            if (!checkIntValueNorms(fieldDef,
+                    paramValue, false, -1, result)) {
+                tgtValueSet.add((Integer) result.retData1);
             }
         }
-        procResult.setSuccResult(tgtValueSet);
-        return procResult;
+        result.setSuccResult(tgtValueSet);
+        return result.success;
     }
 
     /**
@@ -330,43 +331,44 @@ public class WebParameterUtils {
      * @param required   a boolean value represent whether the parameter is must required
      * @param defValue   a default value returned if failed to parse value from the given object
      * @param minValue   min value required
-     * @return valid result for the parameter value
+     * @param result     process result of parameter value
+     * @return process result
      */
-    public static ProcessResult getIntParamValue(HttpServletRequest req,
+    public static boolean getIntParamValue(HttpServletRequest req,
                                                  WebFieldDef fieldDef,
                                                  boolean required,
                                                  int defValue,
-                                                 int minValue) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
+                                                 int minValue,
+                                                 ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef, required, null, result)) {
+            return result.success;
         }
         if (fieldDef.isCompFieldType()) {
             Set<Integer> tgtValueSet = new HashSet<Integer>();
-            Set<String> valItemSet = (Set<String>) procResult.retData1;
+            Set<String> valItemSet = (Set<String>) result.retData1;
             if (valItemSet.isEmpty()) {
                 tgtValueSet.add(defValue);
-                procResult.setSuccResult(tgtValueSet);
-                return procResult;
+                result.setSuccResult(tgtValueSet);
+                return result.success;
             }
-            ProcessResult procRet = new ProcessResult();
             for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) {
-                    return procRet;
+                if (!checkIntValueNorms(fieldDef,
+                        itemVal, true, minValue, result)) {
+                    return result.success;
                 }
-                tgtValueSet.add((Integer) procRet.retData1);
+                tgtValueSet.add((Integer) result.retData1);
             }
-            procResult.setSuccResult(tgtValueSet);
+            result.setSuccResult(tgtValueSet);
         } else {
-            String paramValue = (String) procResult.retData1;
+            String paramValue = (String) result.retData1;
             if (paramValue == null) {
-                procResult.setSuccResult(defValue);
-                return procResult;
+                result.setSuccResult(defValue);
+                return result.success;
             }
-            checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue);
+            checkIntValueNorms(fieldDef,
+                    paramValue, true, minValue, result);
         }
-        return procResult;
+        return result.success;
     }
 
     /**
@@ -376,24 +378,24 @@ public class WebParameterUtils {
      * @param fieldDef    the parameter field definition
      * @param required    a boolean value represent whether the parameter is must required
      * @param defValue    a default value returned if failed to parse value from the given object
+     * @param result      process result
      * @return valid result for the parameter value
      */
-    public static ProcessResult getBooleanParamValue(HttpServletRequest req,
-                                                     WebFieldDef fieldDef,
-                                                     boolean required,
-                                                     boolean defValue) {
-        ProcessResult procResult =
-                getStringParamValue(req, fieldDef, required, null);
-        if (!procResult.success) {
-            return procResult;
-        }
-        String paramValue = (String) procResult.retData1;
+    public static boolean getBooleanParamValue(HttpServletRequest req,
+                                               WebFieldDef fieldDef,
+                                               boolean required,
+                                               boolean defValue,
+                                               ProcessResult result) {
+        if (!getStringParamValue(req, fieldDef, required, null, result)) {
+            return result.success;
+        }
+        String paramValue = (String) result.retData1;
         if (paramValue == null) {
-            procResult.setSuccResult(defValue);
-            return procResult;
+            result.setSuccResult(defValue);
+            return result.success;
         }
-        procResult.setSuccResult(Boolean.parseBoolean(paramValue));
-        return procResult;
+        result.setSuccResult(Boolean.parseBoolean(paramValue));
+        return result.success;
     }
 
     /**
@@ -403,13 +405,14 @@ public class WebParameterUtils {
      * @param fieldDef    the parameter field definition
      * @param required     a boolean value represent whether the parameter is must required
      * @param defValue     a default value returned if failed to parse value from the given object
+     * @param result      process result
      * @return valid result for the parameter value
      */
-    public static ProcessResult getStringParamValue(HttpServletRequest req,
-                                                    WebFieldDef fieldDef,
-                                                    boolean required,
-                                                    String defValue) {
-        ProcessResult procResult = new ProcessResult();
+    public static boolean getStringParamValue(HttpServletRequest req,
+                                              WebFieldDef fieldDef,
+                                              boolean required,
+                                              String defValue,
+                                              ProcessResult result) {
         // get parameter value
         String paramValue = req.getParameter(fieldDef.name);
         if (paramValue == null) {
@@ -422,14 +425,14 @@ public class WebParameterUtils {
         // Check if the parameter exists
         if (TStringUtils.isBlank(paramValue)) {
             if (required) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append(" is missing or value is null or blank!").toString());
             } else {
-                procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+                procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
             }
-            return procResult;
+            return result.success;
         }
         // check if value is norm;
         if (fieldDef.isCompFieldType()) {
@@ -440,41 +443,41 @@ public class WebParameterUtils {
                 if (TStringUtils.isBlank(strParamValueItem)) {
                     continue;
                 }
-                if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) {
-                    return procResult;
+                if (!checkStrValueNorms(fieldDef, strParamValueItem, result)) {
+                    return result.success;
                 }
-                valItemSet.add((String) procResult.retData1);
+                valItemSet.add((String) result.retData1);
             }
             // check if is empty result
             if (valItemSet.isEmpty()) {
                 if (required) {
-                    procResult.setFailResult(fieldDef.id,
+                    result.setFailResult(fieldDef.id,
                             new StringBuilder(512).append("Parameter ")
                                     .append(fieldDef.name)
                                     .append(" is missing or value is null or blank!").toString());
                 } else {
-                    procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue);
+                    procStringDefValue(fieldDef.isCompFieldType(), defValue, result);
                 }
-                return procResult;
+                return result.success;
             }
             // check max item count
             if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) {
                 if (valItemSet.size() > fieldDef.itemMaxCnt) {
-                    procResult.setFailResult(fieldDef.id,
+                    result.setFailResult(fieldDef.id,
                             new StringBuilder(512).append("Parameter ")
                                     .append(fieldDef.name)
                                     .append("'s item count over max allowed count (")
                                     .append(fieldDef.itemMaxCnt).append(")!").toString());
                 }
             }
-            procResult.setSuccResult(valItemSet);
+            result.setSuccResult(valItemSet);
         } else {
-            if (!checkStrValueNorms(procResult, fieldDef, paramValue)) {
-                return procResult;
+            if (!checkStrValueNorms(fieldDef, paramValue, result)) {
+                return result.success;
             }
-            procResult.setSuccResult(paramValue);
+            result.setSuccResult(paramValue);
         }
-        return procResult;
+        return result.success;
     }
 
     /**
@@ -484,13 +487,14 @@ public class WebParameterUtils {
      * @param fieldDef    the parameter field definition
      * @param required    a boolean value represent whether the parameter is must required
      * @param defValue    a default value returned if failed to parse value from the given object
+     * @param result      process result
      * @return valid result for the parameter value
      */
-    public static ProcessResult getJsonDictParamValue(HttpServletRequest req,
-                                                      WebFieldDef fieldDef,
-                                                      boolean required,
-                                                      Map<String, Long> defValue) {
-        ProcessResult procResult = new ProcessResult();
+    public static boolean getJsonDictParamValue(HttpServletRequest req,
+                                                WebFieldDef fieldDef,
+                                                boolean required,
+                                                Map<String, Long> defValue,
+                                                ProcessResult result) {
         // get parameter value
         String paramValue = req.getParameter(fieldDef.name);
         if (paramValue == null) {
@@ -503,20 +507,20 @@ public class WebParameterUtils {
         // Check if the parameter exists
         if (TStringUtils.isBlank(paramValue)) {
             if (required) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append(" is missing or value is null or blank!").toString());
             } else {
-                procResult.setSuccResult(defValue);
+                result.setSuccResult(defValue);
             }
-            return procResult;
+            return result.success;
         }
         try {
             paramValue = URLDecoder.decode(paramValue,
                     TBaseConstants.META_DEFAULT_CHARSET_NAME);
         } catch (UnsupportedEncodingException e) {
-            procResult.setFailResult(fieldDef.id,
+            result.setFailResult(fieldDef.id,
                     new StringBuilder(512).append("Parameter ")
                             .append(fieldDef.name)
                             .append(" decode error, exception is ")
@@ -524,73 +528,83 @@ public class WebParameterUtils {
         }
         if (TStringUtils.isBlank(paramValue)) {
             if (required) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append("'s value is blank!").toString());
             } else {
-                procResult.setSuccResult(defValue);
+                result.setSuccResult(defValue);
             }
-            return procResult;
+            return result.success;
         }
         if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
             if (paramValue.length() > fieldDef.valMaxLen) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name)
                                 .append("'s length over max allowed length (")
                                 .append(fieldDef.valMaxLen).append(")!").toString());
-                return procResult;
+                return result.success;
             }
         }
-        procResult.setSuccResult(new Gson().fromJson(paramValue,
-                new TypeToken<Map<String, Long>>(){}.getType()));
-        return procResult;
+        // parse data
+        try {
+            Map<String, Long> manOffsets = new Gson().fromJson(paramValue,
+                    new TypeToken<Map<String, Long>>(){}.getType());
+            result.setSuccResult(manOffsets);
+        } catch (Throwable e) {
+            result.setFailResult(fieldDef.id,
+                    new StringBuilder(512).append("Parameter ")
+                            .append(fieldDef.name)
+                            .append(" value parse failure, error is ")
+                            .append(e.getMessage()).append("!").toString());
+        }
+        return result.success;
     }
 
     /**
      * process string default value
      *
-     * @param procResult process result
      * @param isCompFieldType   the parameter if compound field type
      * @param defValue   the parameter default value
+     * @param result process result
      * @return process result for default value of parameter
      */
-    private static ProcessResult procStringDefValue(ProcessResult procResult,
-                                                    boolean isCompFieldType,
-                                                    String defValue) {
+    private static boolean procStringDefValue(boolean isCompFieldType,
+                                              String defValue,
+                                              ProcessResult result) {
         if (isCompFieldType) {
             Set<String> valItemSet = new HashSet<>();
             if (TStringUtils.isNotBlank(defValue)) {
                 valItemSet.add(defValue);
             }
-            procResult.setSuccResult(valItemSet);
+            result.setSuccResult(valItemSet);
         } else {
-            procResult.setSuccResult(defValue);
+            result.setSuccResult(defValue);
         }
-        return procResult;
+        return result.success;
     }
 
     /**
      * Parse the parameter string value by regex define
      *
-     * @param procResult   process result
      * @param fieldDef     the parameter field definition
      * @param paramVal     the parameter value
+     * @param result       process result
      * @return check result for string value of parameter
      */
-    private static boolean checkStrValueNorms(ProcessResult procResult,
-                                              WebFieldDef fieldDef,
-                                              String paramVal) {
+    private static boolean checkStrValueNorms(WebFieldDef fieldDef,
+                                              String paramVal,
+                                              ProcessResult result) {
         paramVal = paramVal.trim();
         if (TStringUtils.isBlank(paramVal)) {
-            procResult.setSuccResult(null);
+            result.setSuccResult(null);
             return true;
         }
         // check value's max length
         if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
             if (paramVal.length() > fieldDef.valMaxLen) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("over max length for ")
                                 .append(fieldDef.name).append(", only allow ")
                                 .append(fieldDef.valMaxLen).append(" length").toString());
@@ -600,44 +614,44 @@ public class WebParameterUtils {
         // check value's pattern
         if (fieldDef.regexCheck) {
             if (!paramVal.matches(fieldDef.regexDef.getPattern())) {
-                procResult.setFailResult(fieldDef.id,
+                result.setFailResult(fieldDef.id,
                         new StringBuilder(512).append("illegal value for ")
                                 .append(fieldDef.name).append(", value ")
                                 .append(fieldDef.regexDef.getErrMsgTemp()).toString());
                 return false;
             }
         }
-        procResult.setSuccResult(paramVal);
+        result.setSuccResult(paramVal);
         return true;
     }
 
     /**
      * Parse the parameter string value by regex define
      *
-     * @param procResult   process result
      * @param fieldDef     the parameter field definition
      * @param paramValue   the parameter value
      * @param hasMinVal    whether there is a minimum
      * param minValue      the parameter min value
+     * @param result   process result
      * @return check result for string value of parameter
      */
-    private static boolean checkIntValueNorms(ProcessResult procResult,
-                                              WebFieldDef fieldDef,
+    private static boolean checkIntValueNorms(WebFieldDef fieldDef,
                                               String paramValue,
                                               boolean hasMinVal,
-                                              int minValue) {
+                                              int minValue,
+                                              ProcessResult result) {
         try {
             int paramIntVal = Integer.parseInt(paramValue);
             if (hasMinVal && paramIntVal < minValue) {
-                procResult.setFailResult(400,
+                result.setFailResult(400,
                         new StringBuilder(512).append("Parameter ")
                                 .append(fieldDef.name).append(" value must >= ")
                                 .append(minValue).toString());
                 return false;
             }
-            procResult.setSuccResult(paramIntVal);
+            result.setSuccResult(paramIntVal);
         } catch (Throwable e) {
-            procResult.setFailResult(400,
+            result.setFailResult(400,
                     new StringBuilder(512).append("Parameter ")
                             .append(fieldDef.name).append(" parse error: ")
                             .append(e.getMessage()).toString());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 3bdfe16..8bef5ab 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -207,7 +207,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                         WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
                                 req.getParameter("memCacheMsgSizeInMB"),
                                 false, defmemCacheMsgSizeInMB, 2);
-                memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
+                memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
                 int memCacheFlushIntvl =
                         WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                                 req.getParameter("memCacheFlushIntvl"),
@@ -354,7 +354,7 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
                             WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
                                     jsonObject.get("memCacheMsgSizeInMB"),
                                     false, brokerConfEntity.getDftMemCacheMsgSizeInMB(), 2);
-                    memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB;
+                    memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
                     int memCacheFlushIntvl =
                             WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                                     jsonObject.get("memCacheFlushIntvl"),
@@ -714,10 +714,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
      * @return
      */
     public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
         StringBuilder strBuffer = new StringBuilder(512);
-        ProcessResult result = WebParameterUtils.getIntParamValue(req,
-                WebFieldDef.COMPSBROKERID, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, result)) {
             WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
             return strBuffer;
         }
@@ -752,17 +752,16 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler {
      * @return
      */
     public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) {
+        ProcessResult result = new ProcessResult();
         StringBuilder strBuffer = new StringBuilder(512);
-        ProcessResult result = WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null);
-        if (!result.success) {
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, result)) {
             WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
             return strBuffer;
         }
         Set<String> topicNameSet = (Set<String>) result.retData1;
-        result = WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.WITHIP, false, false);
-        if (!result.success) {
+        if (!WebParameterUtils.getBooleanParamValue(req,
+                WebFieldDef.WITHIP, false, false, result)) {
             WebParameterUtils.buildFailResult(strBuffer, result.errInfo);
             return strBuffer;
         }